Parallel Ethereum protocols (BLS signature and KZG) (#279)

* BLS sig: parallel batch verification

* BLS: speedup parallel batch verify with Miller loops on local threads

* shutdown bench

* nit: import style

* implement parallel KZG

* Parallel KZG commitments

* add benchmarks of KZG

* rename protocol file

* small optim: reorder await

* fix rebase

* Faster parallel BLS verification

* fix commitment status replacing previous error in verify_blob_kzg_proof_batch_parallel

* 2x faster parallel EC sum for less than 8192 points
This commit is contained in:
Mamy Ratsimbazafy 2023-10-06 07:58:20 +00:00 committed by GitHub
parent f9258531f9
commit 0f9b9e9606
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1989 additions and 131 deletions

View File

@ -73,14 +73,14 @@ proc main() =
for numPoints in testNumPoints:
let batchIters = max(1, Iters div numPoints)
multiAddParallelBench(ECP_ShortW_Jac[Fp[curve], G1], numPoints, batchIters)
separator()
for numPoints in testNumPoints:
let batchIters = max(1, Iters div numPoints)
multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = false, batchIters)
separator()
for numPoints in testNumPoints:
let batchIters = max(1, Iters div numPoints)
multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = true, batchIters)
# separator()
# for numPoints in testNumPoints:
# let batchIters = max(1, Iters div numPoints)
# multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = false, batchIters)
# separator()
# for numPoints in testNumPoints:
# let batchIters = max(1, Iters div numPoints)
# multiAddBench(ECP_ShortW_JacExt[Fp[curve], G1], numPoints, useBatching = true, batchIters)
separator()
separator()

View File

@ -43,7 +43,7 @@ proc multiAddParallelBench*(EC: typedesc, numPoints: int, iters: int) =
var r{.noInit.}: EC
var tp = Threadpool.new()
let tp = Threadpool.new()
bench("EC parallel batch add (" & align($tp.numThreads, 2) & " threads) " & $EC.G & " (" & $numPoints & " points)", EC, iters):
tp.sum_reduce_vartime_parallel(r, points)

View File

@ -9,22 +9,25 @@
import
# Internals
../constantine/[
ethereum_bls_signatures,
ethereum_bls_signatures_parallel,
ethereum_eip2333_bls12381_key_derivation],
../constantine/math/arithmetic,
../constantine/threadpool/threadpool,
# Std
std/[os, cpuinfo],
# Helpers
../helpers/prng_unsafe,
./bench_blueprint
proc separator*() = separator(167)
proc separator*() = separator(180)
proc report(op, curve: string, startTime, stopTime: MonoTime, startClk, stopClk: int64, iters: int) =
let ns = inNanoseconds((stopTime-startTime) div iters)
let throughput = 1e9 / float64(ns)
when SupportsGetTicks:
echo &"{op:<75} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op {(stopClk - startClk) div iters:>9} CPU cycles (approx)"
echo &"{op:<88} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op {(stopClk - startClk) div iters:>9} CPU cycles (approx)"
else:
echo &"{op:<75} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op"
echo &"{op:<8} {curve:<15} {throughput:>15.3f} ops/s {ns:>9} ns/op"
template bench(op: string, curve: string, iters: int, body: untyped): untyped =
measure(iters, startTime, stopTime, startClk, stopClk, body)
@ -184,6 +187,43 @@ proc benchVerifyBatched*(numSigs, iters: int) =
let ok = batch_verify(pubkeys, messages, signatures, secureBlindingBytes)
doAssert ok == cttBLS_Success
proc benchVerifyBatchedParallel*(numSigs, iters: int) =
## Verification of N pubkeys signing for N messages
var
tp: Threadpool
pubkeys: seq[PublicKey]
messages: seq[array[32, byte]]
signatures: seq[Signature]
var hashedMsg: array[32, byte]
var sig: Signature
var numThreads: int
if existsEnv"CTT_NUM_THREADS":
numThreads = getEnv"CTT_NUM_THREADS".parseInt()
else:
numThreads = countProcessors()
tp = Threadpool.new(numThreads)
for i in 0 ..< numSigs:
let (sk, pk) = demoKeyGen()
sha256.hash(hashedMsg, "msg" & $i)
sig.sign(sk, hashedMsg)
pubkeys.add pk
messages.add hashedMsg
signatures.add sig
let secureBlindingBytes = sha256.hash("Mr F was here")
bench("BLS parallel batch verify (" & $tp.numThreads & " threads) of " & $numSigs & " msgs by "& $numSigs & " pubkeys (with blinding)", "BLS12_381", iters):
let ok = tp.batch_verify_parallel(pubkeys, messages, signatures, secureBlindingBytes)
doAssert ok == cttBLS_Success, "invalid status: " & $ok
tp.shutdown()
const Iters = 1000
proc main() =
@ -202,16 +242,19 @@ proc main() =
# Simulate Block verification (at most 6 signatures per block)
benchVerifyMulti(numSigs = 6, iters = 10)
benchVerifyBatched(numSigs = 6, iters = 10)
benchVerifyBatchedParallel(numSigs = 6, iters = 10)
separator()
# Simulate 10 blocks verification
benchVerifyMulti(numSigs = 60, iters = 10)
benchVerifyBatched(numSigs = 60, iters = 10)
benchVerifyBatchedParallel(numSigs = 60, iters = 10)
separator()
# Simulate 30 blocks verification
benchVerifyMulti(numSigs = 180, iters = 10)
benchVerifyBatched(numSigs = 180, iters = 10)
benchVerifyBatchedParallel(numSigs = 180, iters = 10)
separator()
main()

View File

@ -0,0 +1 @@
--threads:on

View File

@ -0,0 +1,231 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
# Internals
../constantine/ethereum_eip4844_kzg_parallel,
../constantine/math/io/io_fields,
../constantine/math/config/[curves, type_ff],
../constantine/threadpool/threadpool,
../constantine/csprngs/sysrand,
../constantine/platforms/primitives,
# Helpers
../helpers/prng_unsafe,
./bench_blueprint
proc separator*() = separator(180)
proc report(op, threads: string, startTime, stopTime: MonoTime, startClk, stopClk: int64, iters: int) =
let ns = inNanoseconds((stopTime-startTime) div iters)
let throughput = 1e9 / float64(ns)
when SupportsGetTicks:
echo &"{op:<40} {threads:<16} {throughput:>15.3f} ops/s {ns:>9} ns/op {(stopClk - startClk) div iters:>9} CPU cycles (approx)"
else:
echo &"{op:<40} {threads:<16} {throughput:>15.3f} ops/s {ns:>9} ns/op"
template bench(op, threads: string, iters: int, body: untyped): untyped =
measure(iters, startTime, stopTime, startClk, stopClk, body)
report(op, threads, startTime, stopTime, startClk, stopClk, iters)
type
BenchSet[N: static int] = ref object
blobs: array[N, Blob]
commitments: array[N, array[48, byte]]
proofs: array[N, array[48, byte]]
# This is only used for `verify_kzg_proof` and
# there is no short-circuit if they don't match
challenge, eval_at_challenge: array[32, byte]
proc new(T: type BenchSet, ctx: ptr EthereumKZGContext): T =
new(result)
for i in 0 ..< result.N:
let t {.noInit.} = rng.random_unsafe(Fr[BLS12_381])
result.blobs[i].marshal(t, bigEndian)
discard ctx.blob_to_kzg_commitment(result.commitments[i], result.blobs[i].addr)
discard ctx.compute_blob_kzg_proof(result.proofs[i], result.blobs[i].addr, result.commitments[i])
let challenge = rng.random_unsafe(Fr[BLS12_381])
let eval_at_challenge = rng.random_unsafe(Fr[BLS12_381])
discard result.challenge.marshal(challenge, bigEndian)
discard result.eval_at_challenge.marshal(eval_at_challenge, bigEndian)
proc benchBlobToKzgCommitment(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) =
let startSerial = getMonotime()
block:
bench("blob_to_kzg_commitment", "serial", iters):
var commitment {.noInit.}: array[48, byte]
doAssert cttEthKZG_Success == ctx.blob_to_kzg_commitment(commitment, b.blobs[0].addr)
let stopSerial = getMonotime()
## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches
let tp = Threadpool.new()
let startParallel = getMonotime()
block:
bench("blob_to_kzg_commitment", $tp.numThreads & " threads", iters):
var commitment {.noInit.}: array[48, byte]
doAssert cttEthKZG_Success == tp.blob_to_kzg_commitment_parallel(ctx, commitment, b.blobs[0].addr)
let stopParallel = getMonotime()
let perfSerial = inNanoseconds((stopSerial-startSerial) div iters)
let perfParallel = inNanoseconds((stopParallel-startParallel) div iters)
let parallelSpeedup = float(perfSerial) / float(perfParallel)
echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x"
proc benchComputeKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) =
let startSerial = getMonotime()
block:
bench("compute_kzg_proof", "serial", iters):
var proof {.noInit.}: array[48, byte]
var eval_at_challenge {.noInit.}: array[32, byte]
doAssert cttEthKZG_Success == ctx.compute_kzg_proof(proof, eval_at_challenge, b.blobs[0].addr, b.challenge)
let stopSerial = getMonotime()
## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches
let tp = Threadpool.new()
let startParallel = getMonotime()
block:
bench("compute_kzg_proof", $tp.numThreads & " threads", iters):
var proof {.noInit.}: array[48, byte]
var eval_at_challenge {.noInit.}: array[32, byte]
doAssert cttEthKZG_Success == tp.compute_kzg_proof_parallel(ctx, proof, eval_at_challenge, b.blobs[0].addr, b.challenge)
let stopParallel = getMonotime()
let perfSerial = inNanoseconds((stopSerial-startSerial) div iters)
let perfParallel = inNanoseconds((stopParallel-startParallel) div iters)
let parallelSpeedup = float(perfSerial) / float(perfParallel)
echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x"
proc benchComputeBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) =
let startSerial = getMonotime()
block:
bench("compute_blob_kzg_proof", "serial", iters):
var proof {.noInit.}: array[48, byte]
doAssert cttEthKZG_Success == ctx.compute_blob_kzg_proof(proof, b.blobs[0].addr, b.commitments[0])
let stopSerial = getMonotime()
## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches
let tp = Threadpool.new()
let startParallel = getMonotime()
block:
bench("compute_blob_kzg_proof", $tp.numThreads & " threads", iters):
var proof {.noInit.}: array[48, byte]
doAssert cttEthKZG_Success == tp.compute_blob_kzg_proof_parallel(ctx, proof, b.blobs[0].addr, b.commitments[0])
let stopParallel = getMonotime()
let perfSerial = inNanoseconds((stopSerial-startSerial) div iters)
let perfParallel = inNanoseconds((stopParallel-startParallel) div iters)
let parallelSpeedup = float(perfSerial) / float(perfParallel)
echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x"
proc benchVerifyKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) =
bench("verify_kzg_proof", "serial", iters):
discard ctx.verify_kzg_proof(b.commitments[0], b.challenge, b.eval_at_challenge, b.proofs[0])
echo "verify_kzg_proof is always serial"
proc benchVerifyBlobKzgProof(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) =
let startSerial = getMonotime()
block:
bench("verify_blob_kzg_proof", "serial", iters):
discard ctx.verify_blob_kzg_proof(b.blobs[0].addr, b.commitments[0], b.proofs[0])
let stopSerial = getMonotime()
## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches
let tp = Threadpool.new()
let startParallel = getMonotime()
block:
bench("verify_blob_kzg_proof", $tp.numThreads & " threads", iters):
discard tp.verify_blob_kzg_proof_parallel(ctx, b.blobs[0].addr, b.commitments[0], b.proofs[0])
let stopParallel = getMonotime()
let perfSerial = inNanoseconds((stopSerial-startSerial) div iters)
let perfParallel = inNanoseconds((stopParallel-startParallel) div iters)
let parallelSpeedup = float(perfSerial) / float(perfParallel)
echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x"
proc benchVerifyBlobKzgProofBatch(b: BenchSet, ctx: ptr EthereumKZGContext, iters: int) =
var secureRandomBytes {.noInit.}: array[32, byte]
discard sysrand(secureRandomBytes)
var i = 1
while i <= b.N:
let startSerial = getMonotime()
block:
bench("verify_blob_kzg_proof (batch " & $i & ')', "serial", iters):
discard verify_blob_kzg_proof_batch(
ctx,
b.blobs.asUnchecked(),
b.commitments.asUnchecked(),
b.proofs.asUnchecked(),
i,
secureRandomBytes)
let stopSerial = getMonotime()
## We require `tp` to be unintialized as even idle threads somehow reduce perf of serial benches
let tp = Threadpool.new()
let startParallel = getMonotime()
block:
bench("verify_blob_kzg_proof (batch " & $i & ')', $tp.numThreads & " threads", iters):
discard tp.verify_blob_kzg_proof_batch_parallel(
ctx,
b.blobs.asUnchecked(),
b.commitments.asUnchecked(),
b.proofs.asUnchecked(),
i,
secureRandomBytes)
let stopParallel = getMonotime()
let perfSerial = inNanoseconds((stopSerial-startSerial) div iters)
let perfParallel = inNanoseconds((stopParallel-startParallel) div iters)
let parallelSpeedup = float(perfSerial) / float(perfParallel)
echo &"Speedup ratio parallel {tp.numThreads} threads over serial: {parallelSpeedup:>6.3f}x"
echo ""
i *= 2
const Iters = 100
proc main() =
let ctx = load_ethereum_kzg_test_trusted_setup_mainnet()
let b = BenchSet[64].new(ctx)
separator()
benchBlobToKzgCommitment(b, ctx, Iters)
echo ""
benchComputeKzgProof(b, ctx, Iters)
echo ""
benchComputeBlobKzgProof(b, ctx, Iters)
echo ""
benchVerifyKzgProof(b, ctx, Iters)
echo ""
benchVerifyBlobKzgProof(b, ctx, Iters)
echo ""
benchVerifyBlobKzgProofBatch(b, ctx, Iters)
separator()
when isMainModule:
main()

View File

@ -0,0 +1 @@
--threads:on

View File

@ -497,6 +497,7 @@ const testDesc: seq[tuple[path: string, useGMP: bool]] = @[
("tests/t_ethereum_bls_signatures.nim", false),
("tests/t_ethereum_eip2333_bls12381_key_derivation.nim", false),
("tests/t_ethereum_eip4844_deneb_kzg.nim", false),
("tests/t_ethereum_eip4844_deneb_kzg_parallel.nim", false),
]
const testDescNvidia: seq[string] = @[
@ -555,6 +556,7 @@ const benchDesc = [
"bench_sha256",
"bench_hash_to_curve",
"bench_ethereum_bls_signatures",
"bench_ethereum_eip4844_kzg",
"bench_evm_modexp_dos",
"bench_gmp_modexp",
"bench_gmp_modmul"
@ -974,3 +976,8 @@ task bench_hash_to_curve, "Run Hash-to-Curve benchmarks":
# ------------------------------------------
task bench_ethereum_bls_signatures, "Run Ethereum BLS signatures benchmarks - CC compiler":
runBench("bench_ethereum_bls_signatures")
# EIP 4844 - KZG Polynomial Commitments
# ------------------------------------------
task bench_ethereum_eip4844_kzg, "Run Ethereum EIP4844 KZG Polynomial commitment - CC compiler":
runBench("bench_ethereum_eip4844_kzg")

View File

@ -397,6 +397,7 @@ func kzg_verify_batch*[bits: static int, F2; C: static Curve](
freeHeapAligned(commits_min_evals)
# ∑[rᵢ][zᵢ][proofᵢ]₁
# ------------------
var tmp {.noInit.}: Fr[C]
for i in 0 ..< n:
tmp.prod(linearIndepRandNumbers[i], challenges[i])
@ -406,6 +407,7 @@ func kzg_verify_batch*[bits: static int, F2; C: static Curve](
freeHeapAligned(coefs)
# e(∑ [rᵢ][proofᵢ]₁, [τ]₂) . e(∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + ∑[rᵢ][zᵢ][proofᵢ]₁, [-1]₂) = 1
# -----------------------------------------------------------------------------------------------------------
template sum_of_sums: untyped = sums_jac[1]
sum_of_sums.sum_vartime(sum_commit_minus_evals_G1, sum_rand_challenge_proofs)

View File

@ -0,0 +1,267 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
../math/config/curves,
../math/[ec_shortweierstrass, arithmetic, extension_fields],
../math/elliptic/[ec_multi_scalar_mul_parallel, ec_shortweierstrass_batch_ops],
../math/pairings/pairings_generic,
../math/constants/zoo_generators,
../math/polynomials/polynomials,
../platforms/[abstractions, views],
../threadpool/threadpool
import ./kzg_polynomial_commitments {.all.}
export kzg_polynomial_commitments
## ############################################################
##
## KZG Polynomial Commitments
## Parallel Edition
##
## ############################################################
# KZG - Prover - Lagrange basis
# ------------------------------------------------------------
proc kzg_commit_parallel*[N: static int, C: static Curve](
tp: Threadpool,
commitment: var ECP_ShortW_Aff[Fp[C], G1],
poly_evals: array[N, BigInt],
powers_of_tau: PolynomialEval[N, G1aff[C]]) =
## KZG Commit to a polynomial in Lagrange / Evaluation form
## Parallelism: This only returns when computation is fully done
var commitmentJac {.noInit.}: ECP_ShortW_Jac[Fp[C], G1]
tp.multiScalarMul_vartime_parallel(commitmentJac, poly_evals, powers_of_tau.evals)
commitment.affine(commitmentJac)
proc kzg_prove_parallel*[N: static int, C: static Curve](
tp: Threadpool,
proof: var ECP_ShortW_Aff[Fp[C], G1],
eval_at_challenge: var Fr[C],
poly: ptr PolynomialEval[N, Fr[C]],
domain: ptr PolyDomainEval[N, Fr[C]],
challenge: ptr Fr[C],
powers_of_tau: PolynomialEval[N, G1aff[C]],
isBitReversedDomain: static bool) =
## KZG prove commitment to a polynomial in Lagrange / Evaluation form
##
## Outputs:
## - proof
## - eval_at_challenge
##
## Parallelism: This only returns when computation is fully done
# Note:
# The order of inputs in
# `kzg_prove`, `evalPolyAt`, `differenceQuotientEvalOffDomain`, `differenceQuotientEvalInDomain`
# minimizes register changes when parameter passing.
#
# z = challenge in the following code
let diffQuotientPolyFr = allocHeapAligned(PolynomialEval[N, Fr[C]], alignment = 64)
let invRootsMinusZ = allocHeapAligned(array[N, Fr[C]], alignment = 64)
# Compute 1/(ωⁱ - z) with ω a root of unity, i in [0, N).
# zIndex = i if ωⁱ - z == 0 (it is the i-th root of unity) and -1 otherwise.
let zIndex = invRootsMinusZ[].inverseRootsMinusZ_vartime(
domain[], challenge[],
earlyReturnOnZero = false)
if zIndex == -1:
# p(z)
tp.evalPolyAt_parallel(
eval_at_challenge,
poly, challenge,
invRootsMinusZ,
domain)
# q(x) = (p(x) - p(z)) / (x - z)
tp.differenceQuotientEvalOffDomain_parallel(
diffQuotientPolyFr,
poly, eval_at_challenge.addr, invRootsMinusZ)
else:
# p(z)
# But the challenge z is equal to one of the roots of unity (how likely is that?)
eval_at_challenge = poly.evals[zIndex]
# q(x) = (p(x) - p(z)) / (x - z)
tp.differenceQuotientEvalInDomain_parallel(
diffQuotientPolyFr,
poly, uint32 zIndex, invRootsMinusZ, domain, isBitReversedDomain)
freeHeapAligned(invRootsMinusZ)
const orderBits = C.getCurveOrderBitwidth()
let diffQuotientPolyBigInt = allocHeapAligned(array[N, BigInt[orderBits]], alignment = 64)
syncScope:
tp.parallelFor i in 0 ..< N:
captures: {diffQuotientPolyBigInt, diffQuotientPolyFr}
diffQuotientPolyBigInt[i].fromField(diffQuotientPolyFr.evals[i])
freeHeapAligned(diffQuotientPolyFr)
var proofJac {.noInit.}: ECP_ShortW_Jac[Fp[C], G1]
tp.multiScalarMul_vartime_parallel(proofJac, diffQuotientPolyBigInt[], powers_of_tau.evals)
proof.affine(proofJac)
freeHeapAligned(diffQuotientPolyBigInt)
proc kzg_verify_batch_parallel*[bits: static int, F2; C: static Curve](
tp: Threadpool,
commitments: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]],
challenges: ptr UncheckedArray[Fr[C]],
evals_at_challenges: ptr UncheckedArray[BigInt[bits]],
proofs: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]],
linearIndepRandNumbers: ptr UncheckedArray[Fr[C]],
n: int,
tauG2: ECP_ShortW_Aff[F2, G2]): bool {.tags:[HeapAlloc, Alloca, Vartime].} =
## Verify multiple KZG proofs efficiently
##
## Parameters
##
## `n` verification sets
## A verification set i (commitmentᵢ, challengeᵢ, eval_at_challengeᵢ, proofᵢ)
## is passed in a "struct-of-arrays" fashion.
##
## Notation:
## i ∈ [0, n), a verification set with ID i
## [a]₁ corresponds to the scalar multiplication [a]G by the generator G of the group 𝔾1
##
## - `commitments`: `n` commitments [commitmentᵢ]₁
## - `challenges`: `n` challenges zᵢ
## - `evals_at_challenges`: `n` evaluation yᵢ = pᵢ(zᵢ)
## - `proofs`: `n` [proof]₁
## - `linearIndepRandNumbers`: `n` linearly independant numbers that are not in control
## of a prover (potentially malicious).
## - `n`: the number of verification sets
##
## For all (commitmentᵢ, challengeᵢ, eval_at_challengeᵢ, proofᵢ),
## we verify the relation
## proofᵢ.(τ - zᵢ) = pᵢ(τ)-pᵢ(zᵢ)
##
## As τ is the secret from the trusted setup, boxed in [τ]₁ and [τ]₂,
## we rewrite the equality check using pairings
##
## e([proofᵢ]₁, [τ]₂ - [challengeᵢ]₂) . e([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁, [-1]₂) = 1
##
## Or batched using Feist-Khovratovich method
##
## e(∑ [rᵢ][proofᵢ]₁, [τ]₂) . e(∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + ∑[rᵢ][zᵢ][proofᵢ]₁, [-1]₂) = 1
##
## Parallelism: This only returns when computation is fully done
static: doAssert BigInt[bits] is matchingOrderBigInt(C)
var sums_jac {.noInit.}: array[2, ECP_ShortW_Jac[Fp[C], G1]]
template sum_rand_proofs: untyped = sums_jac[0]
template sum_commit_minus_evals_G1: untyped = sums_jac[1]
var sum_rand_challenge_proofs {.noInit.}: ECP_ShortW_Jac[Fp[C], G1]
# ∑ [rᵢ][proofᵢ]₁
# ---------------
let coefs = allocHeapArrayAligned(matchingOrderBigInt(C), n, alignment = 64)
syncScope:
tp.parallelFor i in 0 ..< n:
captures: {coefs, linearIndepRandNumbers}
coefs[i].fromField(linearIndepRandNumbers[i])
let sum_rand_proofs_fv = tp.spawnAwaitable tp.multiScalarMul_vartime_parallel(sum_rand_proofs.addr, coefs, proofs, n)
# ∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁)
# ---------------------------------------------
#
# We interleave allocation and deallocation, which hurts cache reuse
# i.e. when alloc is being done, it's better to do all allocs as the metadata will already be in cache
#
# but it's more important to minimize memory usage especially if we want to commit with 2^26+ points
#
# We dealloc in reverse alloc order, to avoid leaving holes in the allocator pages.
proc compute_sum_commitments_minus_evals(tp: Threadpool,
sum_commit_minus_evals_G1: ptr ECP_ShortW_Jac[Fp[C], G1],
commitments: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]],
evals_at_challenges: ptr UncheckedArray[BigInt[bits]],
coefs: ptr UncheckedArray[BigInt[bits]],
n: int) {.nimcall.} =
let commits_min_evals = allocHeapArrayAligned(ECP_ShortW_Aff[Fp[C], G1], n, alignment = 64)
let commits_min_evals_jac = allocHeapArrayAligned(ECP_ShortW_Jac[Fp[C], G1], n, alignment = 64)
syncScope:
tp.parallelFor i in 0 ..< n:
captures: {commits_min_evals_jac, commitments, evals_at_challenges}
commits_min_evals_jac[i].fromAffine(commitments[i])
var boxed_eval {.noInit.}: ECP_ShortW_Jac[Fp[C], G1]
boxed_eval.fromAffine(C.getGenerator("G1"))
boxed_eval.scalarMul_vartime(evals_at_challenges[i])
commits_min_evals_jac[i].diff_vartime(commits_min_evals_jac[i], boxed_eval)
commits_min_evals.batchAffine(commits_min_evals_jac, n)
freeHeapAligned(commits_min_evals_jac)
tp.multiScalarMul_vartime(sum_commit_minus_evals_G1, coefs, commits_min_evals, n)
freeHeapAligned(commits_min_evals)
let sum_commit_minus_evals_G1_fv = tp.spawnAwaitable tp.compute_sum_commitments_minus_evals(
sum_commit_minus_evals_G1.addr,
commitments,
evals_at_challenges,
coefs,
n)
# ∑[rᵢ][zᵢ][proofᵢ]₁
# ------------------
proc compute_sum_rand_challenge_proofs(tp: Threadpool,
sum_rand_challenge_proofs: ptr ECP_ShortW_Jac[Fp[C], G1],
linearIndepRandNumbers: ptr UncheckedArray[Fr[C]],
challenges: ptr UncheckedArray[Fr[C]],
proofs: ptr UncheckedArray[ECP_ShortW_Aff[Fp[C], G1]],
n: int) {.nimcall.} =
let rand_coefs = allocHeapArrayAligned(matchingOrderBigInt(C), n, alignment = 64)
let rand_coefs_fr = allocHeapArrayAligned(Fr[C], n, alignment = 64)
syncScope:
tp.parallelFor i in 0 ..< n:
rand_coefs_fr[i].prod(linearIndepRandNumbers[i], challenges[i])
rand_coefs[i].fromField(rand_coefs_fr[i])
tp.multiScalarMul_vartime(sum_rand_challenge_proofs, rand_coefs, proofs, n)
freeHeapAligned(rand_coefs_fr)
freeHeapAligned(rand_coefs)
let sum_rand_challenge_proofs_fv = tp.spawnAwaitable tp.compute_sum_rand_challenge_proofs(
sum_rand_challenge_proofs,
linearIndepRandNumbers,
challenges,
proofs,
n)
# e(∑ [rᵢ][proofᵢ]₁, [τ]₂) . e(∑[rᵢ]([commitmentᵢ]₁ - [eval_at_challengeᵢ]₁) + ∑[rᵢ][zᵢ][proofᵢ]₁, [-1]₂) = 1
# -----------------------------------------------------------------------------------------------------------
template sum_of_sums: untyped = sums_jac[1]
discard sync sum_commit_minus_evals_G1_fv
discard sync sum_rand_challenge_proofs_fv
sum_of_sums.sum_vartime(sum_commit_minus_evals_G1, sum_rand_challenge_proofs)
discard sync sum_rand_proofs_fv
freeHeapAligned(coefs)
var sums {.noInit.}: array[2, ECP_ShortW_Aff[Fp[C], G1]]
sums.batchAffine(sums_jac)
var negG2 {.noInit.}: ECP_ShortW_Aff[F2, G2]
negG2.neg(C.getGenerator("G2"))
var gt {.noInit.}: C.getGT()
gt.pairing(sums, [tauG2, negG2])
return gt.isOne().bool()

View File

@ -0,0 +1,146 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
## ############################################################
##
## BLS Signatures on for Ethereum
## Parallel edition
##
## ############################################################
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
# Reexport the serial API
import ./ethereum_bls_signatures {.all.}
export ethereum_bls_signatures
import
std/importutils,
./zoo_exports,
./platforms/views,
./threadpool/threadpool,
./signatures/bls_signatures_parallel
# No exceptions allowed in core cryptographic operations
{.push raises: [].}
{.push checks: off.}
# C FFI
proc batch_verify_parallel*[Msg](
tp: Threadpool,
pubkeys: ptr UncheckedArray[PublicKey],
messages: ptr UncheckedArray[View[byte]],
signatures: ptr UncheckedArray[Signature],
len: int,
secureRandomBytes: array[32, byte]): CttBLSStatus {.libPrefix: prefix_ffi.} =
## Verify that all (pubkey, message, signature) triplets are valid
## returns `true` if all signatures are valid, `false` if at least one is invalid.
##
## For message domain separation purpose, the tag is `BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_`
##
## Input:
## - Public keys initialized by one of the key derivation or deserialization procedure.
## Or validated via validate_pubkey
## - Messages
## - Signatures initialized by one of the key derivation or deserialization procedure.
## Or validated via validate_signature
##
## In particular, the public keys and signature are assumed to be on curve subgroup checked.
##
## To avoid splitting zeros and rogue keys attack:
## 1. Cryptographically-secure random bytes must be provided.
## 2. Augmentation or Proof of possessions must used for each public keys.
##
## The secureRandomBytes will serve as input not under the attacker control to foil potential splitting zeros inputs.
## The scheme assumes that the attacker cannot
## resubmit 2^64 times forged (publickey, message, signature) triplets
## against the same `secureRandomBytes`
privateAccess(PublicKey)
privateAccess(Signature)
if len == 0:
# IETF spec precondition
return cttBLS_ZeroLengthAggregation
# Deal with cases were pubkey or signature were mistakenly zero-init, due to a generic aggregation tentative for example
for i in 0 ..< len:
if pubkeys[i].raw.isInf().bool:
return cttBLS_PointAtInfinity
for i in 0 ..< len:
if signatures[i].raw.isInf().bool:
return cttBLS_PointAtInfinity
let verified = tp.batchVerify_parallel(
pubkeys.toOpenArray(len).unwrap(),
messages,
signatures.toOpenArray(len).unwrap(),
sha256, 128, DomainSeparationTag, secureRandomBytes)
if verified:
return cttBLS_Success
return cttBLS_VerificationFailure
# Nim
proc batch_verify_parallel*[Msg](
tp: Threadpool,
pubkeys: openArray[PublicKey],
messages: openarray[Msg],
signatures: openArray[Signature],
secureRandomBytes: array[32, byte]): CttBLSStatus =
## Verify that all (pubkey, message, signature) triplets are valid
## returns `true` if all signatures are valid, `false` if at least one is invalid.
##
## For message domain separation purpose, the tag is `BLS_SIG_BLS12381G2_XMD:SHA-256_SSWU_RO_POP_`
##
## Input:
## - Public keys initialized by one of the key derivation or deserialization procedure.
## Or validated via validate_pubkey
## - Messages
## - Signatures initialized by one of the key derivation or deserialization procedure.
## Or validated via validate_signature
##
## In particular, the public keys and signature are assumed to be on curve subgroup checked.
##
## To avoid splitting zeros and rogue keys attack:
## 1. Cryptographically-secure random bytes must be provided.
## 2. Augmentation or Proof of possessions must used for each public keys.
##
## The secureRandomBytes will serve as input not under the attacker control to foil potential splitting zeros inputs.
## The scheme assumes that the attacker cannot
## resubmit 2^64 times forged (publickey, message, signature) triplets
## against the same `secureRandomBytes`
privateAccess(PublicKey)
privateAccess(Signature)
if pubkeys.len == 0:
# IETF spec precondition
return cttBLS_ZeroLengthAggregation
if pubkeys.len != messages.len or pubkeys.len != signatures.len:
return cttBLS_InconsistentLengthsOfInputs
# Deal with cases were pubkey or signature were mistakenly zero-init, due to a generic aggregation tentative for example
for i in 0 ..< pubkeys.len:
if pubkeys[i].raw.isInf().bool:
return cttBLS_PointAtInfinity
for i in 0 ..< signatures.len:
if signatures[i].raw.isInf().bool:
return cttBLS_PointAtInfinity
let verified = tp.batchVerify_parallel(
pubkeys.unwrap(),
messages,
signatures.unwrap(),
sha256, 128, DomainSeparationTag, secureRandomBytes)
if verified:
return cttBLS_Success
return cttBLS_VerificationFailure

View File

@ -102,7 +102,7 @@ func fromDigest(dst: var Fr[BLS12_381], src: array[32, byte]) =
Fr[BLS12_381].getNegInvModWord(),
Fr[BLS12_381].getSpareBits())
func fiatShamirChallenge(dst: var Fr[BLS12_381], blob: Blob, commitmentBytes: array[BYTES_PER_COMMITMENT, byte]) =
func fiatShamirChallenge(dst: ptr Fr[BLS12_381], blob: ptr Blob, commitmentBytes: ptr array[BYTES_PER_COMMITMENT, byte]) =
## Compute a Fiat-Shamir challenge
## compute_challenge: https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/polynomial-commitments.md#compute_challenge
var transcript {.noInit.}: sha256
@ -114,12 +114,12 @@ func fiatShamirChallenge(dst: var Fr[BLS12_381], blob: Blob, commitmentBytes: ar
transcript.update(default(array[16-sizeof(uint64), byte]))
transcript.update(FIELD_ELEMENTS_PER_BLOB.uint64.toBytes(bigEndian))
transcript.update(blob)
transcript.update(commitmentBytes)
transcript.update(blob[])
transcript.update(commitmentBytes[])
var challenge {.noInit.}: array[32, byte]
transcript.finish(challenge)
dst.fromDigest(challenge)
dst[].fromDigest(challenge)
func computePowers(dst: ptr UncheckedArray[Fr[BLS12_381]], len: int, base: Fr[BLS12_381]) =
## We need linearly independent random numbers
@ -217,7 +217,7 @@ func blob_to_field_polynomial(
# - Either we are in "HappyPath" section that shortcuts to resource cleanup on error
# - or there are no resources to clean and we can early return from a function.
template check(evalExpr: CttCodecScalarStatus): untyped {.dirty.} =
template checkReturn(evalExpr: CttCodecScalarStatus): untyped {.dirty.} =
# Translate codec status code to KZG status code
# Beware of resource cleanup like heap allocation, this can early exit the caller.
block:
@ -227,7 +227,7 @@ template check(evalExpr: CttCodecScalarStatus): untyped {.dirty.} =
of cttCodecScalar_Zero: discard
of cttCodecScalar_ScalarLargerThanCurveOrder: return cttEthKZG_ScalarLargerThanCurveOrder
template check(evalExpr: CttCodecEccStatus): untyped {.dirty.} =
template checkReturn(evalExpr: CttCodecEccStatus): untyped {.dirty.} =
# Translate codec status code to KZG status code
# Beware of resource cleanup like heap allocation, this can early exit the caller.
block:
@ -248,7 +248,7 @@ template check(Section: untyped, evalExpr: CttCodecScalarStatus): untyped {.dirt
case status
of cttCodecScalar_Success: discard
of cttCodecScalar_Zero: discard
of cttCodecScalar_ScalarLargerThanCurveOrder: result = cttEthKZG_EccPointNotInSubGroup; break Section
of cttCodecScalar_ScalarLargerThanCurveOrder: result = cttEthKZG_ScalarLargerThanCurveOrder; break Section
template check(Section: untyped, evalExpr: CttCodecEccStatus): untyped {.dirty.} =
# Translate codec status code to KZG status code
@ -305,8 +305,8 @@ func compute_kzg_proof*(
blob: ptr Blob,
z_bytes: array[32, byte]): CttEthKzgStatus {.tags:[Alloca, HeapAlloc, Vartime].} =
## Generate:
## - A proof of correct evaluation.
## - y = p(z), the evaluation of p at the challenge z, with p being the Blob interpreted as a polynomial.
## - A zero-knowledge proof of correct evaluation.
##
## Mathematical description
## [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁, with p(τ) being the commitment, i.e. the evaluation of p at the powers of τ
@ -320,7 +320,7 @@ func compute_kzg_proof*(
# Random or Fiat-Shamir challenge
var z {.noInit.}: Fr[BLS12_381]
check z.bytes_to_bls_field(z_bytes)
checkReturn z.bytes_to_bls_field(z_bytes)
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64)
@ -354,16 +354,16 @@ func verify_kzg_proof*(
## Verify KZG proof that p(z) == y where p(z) is the polynomial represented by "polynomial_kzg"
var commitment {.noInit.}: KZGCommitment
check commitment.bytes_to_kzg_commitment(commitment_bytes)
checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes)
var challenge {.noInit.}: matchingOrderBigInt(BLS12_381)
check challenge.bytes_to_bls_bigint(z_bytes)
checkReturn challenge.bytes_to_bls_bigint(z_bytes)
var eval_at_challenge {.noInit.}: matchingOrderBigInt(BLS12_381)
check eval_at_challenge.bytes_to_bls_bigint(y_bytes)
checkReturn eval_at_challenge.bytes_to_bls_bigint(y_bytes)
var proof {.noInit.}: KZGProof
check proof.bytes_to_kzg_proof(proof_bytes)
checkReturn proof.bytes_to_kzg_proof(proof_bytes)
let verif = kzg_verify(ECP_ShortW_Aff[Fp[BLS12_381], G1](commitment),
challenge, eval_at_challenge,
@ -383,7 +383,7 @@ func compute_blob_kzg_proof*(
## This method does not verify that the commitment is correct with respect to `blob`.
var commitment {.noInit.}: KZGCommitment
check commitment.bytes_to_kzg_commitment(commitment_bytes)
checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes)
# Blob -> Polynomial
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64)
@ -394,7 +394,7 @@ func compute_blob_kzg_proof*(
# Fiat-Shamir challenge
var challenge {.noInit.}: Fr[BLS12_381]
challenge.fiatShamirChallenge(blob[], commitment_bytes)
challenge.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr)
# KZG Prove
var y {.noInit.}: Fr[BLS12_381] # y = p(z), eval at challenge z
@ -421,10 +421,10 @@ func verify_blob_kzg_proof*(
## Given a blob and a KZG proof, verify that the blob data corresponds to the provided commitment.
var commitment {.noInit.}: KZGCommitment
check commitment.bytes_to_kzg_commitment(commitment_bytes)
checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes)
var proof {.noInit.}: KZGProof
check proof.bytes_to_kzg_proof(proof_bytes)
checkReturn proof.bytes_to_kzg_proof(proof_bytes)
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64)
let invRootsMinusZ = allocHeapAligned(array[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], alignment = 64)
@ -435,7 +435,7 @@ func verify_blob_kzg_proof*(
# Fiat-Shamir challenge
var challengeFr {.noInit.}: Fr[BLS12_381]
challengeFr.fiatShamirChallenge(blob[], commitment_bytes)
challengeFr.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr)
var challenge, eval_at_challenge {.noInit.}: matchingOrderBigInt(BLS12_381)
challenge.fromField(challengeFr)
@ -510,7 +510,7 @@ func verify_blob_kzg_proof_batch*(
for i in 0 ..< n:
check HappyPath, commitments[i].bytes_to_kzg_commitment(commitments_bytes[i])
check HappyPath, poly.blob_to_field_polynomial(blobs[i].addr)
challenges[i].fiatShamirChallenge(blobs[i], commitments_bytes[i])
challenges[i].addr.fiatShamirChallenge(blobs[i].addr, commitments_bytes[i].addr)
# Lagrange Polynomial evaluation
# ------------------------------

View File

@ -0,0 +1,452 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import ethereum_eip4844_kzg {.all.}
export ethereum_eip4844_kzg
import
./math/config/curves,
./math/[ec_shortweierstrass, arithmetic, extension_fields],
./math/polynomials/polynomials_parallel,
./hashes,
./commitments/kzg_polynomial_commitments_parallel,
./serialization/[codecs_status_codes, codecs_bls12_381],
./math/io/io_fields,
./platforms/[abstractions, allocs],
./threadpool/threadpool
## ############################################################
##
## KZG Polynomial Commitments for Ethereum
## Parallel Edition
##
## ############################################################
##
## This module implements KZG Polynomial commitments (Kate, Zaverucha, Goldberg)
## for the Ethereum blockchain.
##
## References:
## - Ethereum spec:
## https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/polynomial-commitments.md
## - KZG Paper:
## Constant-Size Commitments to Polynomials and Their Applications
## Kate, Zaverucha, Goldberg, 2010
## https://www.iacr.org/archive/asiacrypt2010/6477178/6477178.pdf
## https://cacr.uwaterloo.ca/techreports/2010/cacr2010-10.pdf
## - Audited reference implementation
## https://github.com/ethereum/c-kzg-4844
proc blob_to_bigint_polynomial_parallel(
tp: Threadpool,
dst: ptr PolynomialEval[FIELD_ELEMENTS_PER_BLOB, matchingOrderBigInt(BLS12_381)],
blob: ptr Blob): CttCodecScalarStatus =
## Convert a blob to a polynomial in evaluation form
mixin globalStatus
static:
doAssert sizeof(dst[]) == sizeof(Blob)
doAssert sizeof(array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]) == sizeof(Blob)
let view = cast[ptr array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]](blob)
tp.parallelFor i in 0 ..< FIELD_ELEMENTS_PER_BLOB:
captures: {dst, view}
reduceInto(globalStatus: CttCodecScalarStatus):
prologue:
var workerStatus = cttCodecScalar_Success
forLoop:
let iterStatus = dst.evals[i].bytes_to_bls_bigint(view[i])
if workerStatus == cttCodecScalar_Success:
# Propagate errors, if any it comes from current iteration
workerStatus = iterStatus
merge(remoteFutureStatus: Flowvar[CttCodecScalarStatus]):
let remoteStatus = sync(remoteFutureStatus)
if workerStatus == cttCodecScalar_Success:
# Propagate errors, if any it comes from remote worker
workerStatus = remoteStatus
epilogue:
return workerStatus
return sync(globalStatus)
proc blob_to_field_polynomial_parallel_async(
tp: Threadpool,
dst: ptr PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]],
blob: ptr Blob): Flowvar[CttCodecScalarStatus] =
## Convert a blob to a polynomial in evaluation form
## The result is a `Flowvar` handle and MUST be awaited with `sync`
mixin globalStatus
static:
doAssert sizeof(dst[]) == sizeof(Blob)
doAssert sizeof(array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]) == sizeof(Blob)
let view = cast[ptr array[FIELD_ELEMENTS_PER_BLOB, array[32, byte]]](blob)
tp.parallelFor i in 0 ..< FIELD_ELEMENTS_PER_BLOB:
captures: {dst, view}
reduceInto(globalStatus: CttCodecScalarStatus):
prologue:
var workerStatus = cttCodecScalar_Success
forLoop:
let iterStatus = dst.evals[i].bytes_to_bls_field(view[i])
if workerStatus == cttCodecScalar_Success:
# Propagate errors, if any it comes from current iteration
workerStatus = iterStatus
merge(remoteFutureStatus: Flowvar[CttCodecScalarStatus]):
let remoteStatus = sync(remoteFutureStatus)
if workerStatus == cttCodecScalar_Success:
# Propagate errors, if any it comes from remote worker
workerStatus = remoteStatus
epilogue:
return workerStatus
return globalStatus
# Ethereum KZG public API
# ------------------------------------------------------------
#
# We use a simple goto state machine to handle errors and cleanup (if allocs were done)
# and have 2 different checks:
# - Either we are in "HappyPath" section that shortcuts to resource cleanup on error
# - or there are no resources to clean and we can early return from a function.
func kzgifyStatus(status: CttCodecScalarStatus or CttCodecEccStatus): CttEthKzgStatus {.inline.} =
checkReturn status
proc blob_to_kzg_commitment_parallel*(
tp: Threadpool,
ctx: ptr EthereumKZGContext,
dst: var array[48, byte],
blob: ptr Blob): CttEthKzgStatus =
## Compute a commitment to the `blob`.
## The commitment can be verified without needing the full `blob`
##
## Mathematical description
## commitment = [p(τ)]₁
##
## The blob data is used as a polynomial,
## the polynomial is evaluated at powers of tau τ, a trusted setup.
##
## Verification can be done by verifying the relation:
## proof.(τ - z) = p(τ)-p(z)
## which doesn't require the full blob but only evaluations of it
## - at τ, p(τ) is the commitment
## - and at the verification challenge z.
##
## with proof = [(p(τ) - p(z)) / (τ-z)]₁
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, matchingOrderBigInt(BLS12_381)], 64)
block HappyPath:
check HappyPath, tp.blob_to_bigint_polynomial_parallel(poly, blob)
var r {.noinit.}: ECP_ShortW_Aff[Fp[BLS12_381], G1]
tp.kzg_commit_parallel(r, poly.evals, ctx.srs_lagrange_g1)
discard dst.serialize_g1_compressed(r)
result = cttEthKZG_Success
freeHeapAligned(poly)
return result
proc compute_kzg_proof_parallel*(
tp: Threadpool,
ctx: ptr EthereumKZGContext,
proof_bytes: var array[48, byte],
y_bytes: var array[32, byte],
blob: ptr Blob,
z_bytes: array[32, byte]): CttEthKzgStatus =
## Generate:
## - A proof of correct evaluation.
## - y = p(z), the evaluation of p at the challenge z, with p being the Blob interpreted as a polynomial.
##
## Mathematical description
## [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁, with p(τ) being the commitment, i.e. the evaluation of p at the powers of τ
## The notation [a]₁ corresponds to the scalar multiplication of a by the generator of 𝔾1
##
## Verification can be done by verifying the relation:
## proof.(τ - z) = p(τ)-p(z)
## which doesn't require the full blob but only evaluations of it
## - at τ, p(τ) is the commitment
## - and at the verification challenge z.
# Random or Fiat-Shamir challenge
var z {.noInit.}: Fr[BLS12_381]
checkReturn z.bytes_to_bls_field(z_bytes)
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64)
block HappyPath:
# Blob -> Polynomial
check HappyPath, sync tp.blob_to_field_polynomial_parallel_async(poly, blob)
# KZG Prove
var y {.noInit.}: Fr[BLS12_381] # y = p(z), eval at challenge z
var proof {.noInit.}: ECP_ShortW_Aff[Fp[BLS12_381], G1] # [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁
tp.kzg_prove_parallel(
proof, y,
poly, ctx.domain.addr,
z.addr, ctx.srs_lagrange_g1,
isBitReversedDomain = true)
discard proof_bytes.serialize_g1_compressed(proof) # cannot fail
y_bytes.marshal(y, bigEndian) # cannot fail
result = cttEthKZG_Success
freeHeapAligned(poly)
return result
proc compute_blob_kzg_proof_parallel*(
tp: Threadpool,
ctx: ptr EthereumKZGContext,
proof_bytes: var array[48, byte],
blob: ptr Blob,
commitment_bytes: array[48, byte]): CttEthKzgStatus =
## Given a blob, return the KZG proof that is used to verify it against the commitment.
## This method does not verify that the commitment is correct with respect to `blob`.
var commitment {.noInit.}: KZGCommitment
checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes)
# Blob -> Polynomial
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64)
block HappyPath:
# Blob -> Polynomial, spawn async on other threads
let convStatus = tp.blob_to_field_polynomial_parallel_async(poly, blob)
# Fiat-Shamir challenge
var challenge {.noInit.}: Fr[BLS12_381]
challenge.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr)
# Await conversion to field polynomial
check HappyPath, sync(convStatus)
# KZG Prove
var y {.noInit.}: Fr[BLS12_381] # y = p(z), eval at challenge z
var proof {.noInit.}: ECP_ShortW_Aff[Fp[BLS12_381], G1] # [proof]₁ = [(p(τ) - p(z)) / (τ-z)]₁
tp.kzg_prove_parallel(
proof, y,
poly, ctx.domain.addr,
challenge.addr, ctx.srs_lagrange_g1,
isBitReversedDomain = true)
discard proof_bytes.serialize_g1_compressed(proof) # cannot fail
result = cttEthKZG_Success
freeHeapAligned(poly)
return result
proc verify_blob_kzg_proof_parallel*(
tp: Threadpool,
ctx: ptr EthereumKZGContext,
blob: ptr Blob,
commitment_bytes: array[48, byte],
proof_bytes: array[48, byte]): CttEthKzgStatus =
## Given a blob and a KZG proof, verify that the blob data corresponds to the provided commitment.
var commitment {.noInit.}: KZGCommitment
checkReturn commitment.bytes_to_kzg_commitment(commitment_bytes)
var proof {.noInit.}: KZGProof
checkReturn proof.bytes_to_kzg_proof(proof_bytes)
let poly = allocHeapAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], 64)
let invRootsMinusZ = allocHeapAligned(array[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], alignment = 64)
block HappyPath:
# Blob -> Polynomial, spawn async on other threads
let convStatus = tp.blob_to_field_polynomial_parallel_async(poly, blob)
# Fiat-Shamir challenge
var challengeFr {.noInit.}: Fr[BLS12_381]
challengeFr.addr.fiatShamirChallenge(blob, commitment_bytes.unsafeAddr)
var challenge, eval_at_challenge {.noInit.}: matchingOrderBigInt(BLS12_381)
challenge.fromField(challengeFr)
# Lagrange Polynomial evaluation
# ------------------------------
# 1. Compute 1/(ωⁱ - z) with ω a root of unity, i in [0, N).
# zIndex = i if ωⁱ - z == 0 (it is the i-th root of unity) and -1 otherwise.
let zIndex = invRootsMinusZ[].inverseRootsMinusZ_vartime(
ctx.domain, challengeFr,
earlyReturnOnZero = true)
# Await conversion to field polynomial
check HappyPath, sync(convStatus)
# 2. Actual evaluation
if zIndex == -1:
var eval_at_challenge_fr{.noInit.}: Fr[BLS12_381]
tp.evalPolyAt_parallel(
eval_at_challenge_fr,
poly, challengeFr.addr,
invRootsMinusZ,
ctx.domain.addr)
eval_at_challenge.fromField(eval_at_challenge_fr)
else:
eval_at_challenge.fromField(poly.evals[zIndex])
# KZG verification
let verif = kzg_verify(ECP_ShortW_Aff[Fp[BLS12_381], G1](commitment),
challenge, eval_at_challenge,
ECP_ShortW_Aff[Fp[BLS12_381], G1](proof),
ctx.srs_monomial_g2.coefs[1])
if verif:
result = cttEthKZG_Success
else:
result = cttEthKZG_VerificationFailure
freeHeapAligned(invRootsMinusZ)
freeHeapAligned(poly)
return result
proc verify_blob_kzg_proof_batch_parallel*(
tp: Threadpool,
ctx: ptr EthereumKZGContext,
blobs: ptr UncheckedArray[Blob],
commitments_bytes: ptr UncheckedArray[array[48, byte]],
proof_bytes: ptr UncheckedArray[array[48, byte]],
n: int,
secureRandomBytes: array[32, byte]): CttEthKzgStatus =
## Verify `n` (blob, commitment, proof) sets efficiently
##
## `n` is the number of verifications set
## - if n is negative, this procedure returns verification failure
## - if n is zero, this procedure returns verification success
##
## `secureRandomBytes` random byte must come from a cryptographically secure RNG
## or computed through the Fiat-Shamir heuristic.
## It serves as a random number
## that is not in the control of a potential attacker to prevent potential
## rogue commitments attacks due to homomorphic properties of pairings,
## i.e. commitments that are linear combination of others and sum would be zero.
mixin globalStatus
if n < 0:
return cttEthKZG_VerificationFailure
if n == 0:
return cttEthKZG_Success
let commitments = allocHeapArrayAligned(KZGCommitment, n, alignment = 64)
let challenges = allocHeapArrayAligned(Fr[BLS12_381], n, alignment = 64)
let evals_at_challenges = allocHeapArrayAligned(matchingOrderBigInt(BLS12_381), n, alignment = 64)
let proofs = allocHeapArrayAligned(KZGProof, n, alignment = 64)
let polys = allocHeapArrayAligned(PolynomialEval[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], n, alignment = 64)
let invRootsMinusZs = allocHeapArrayAligned(array[FIELD_ELEMENTS_PER_BLOB, Fr[BLS12_381]], n, alignment = 64)
block HappyPath:
tp.parallelFor i in 0 ..< n:
captures: {tp, ctx,
commitments, commitments_bytes,
polys, blobs,
challenges, evals_at_challenges,
proofs, proof_bytes,
invRootsMinusZs}
reduceInto(globalStatus: CttEthKzgStatus):
prologue:
var workerStatus = cttEthKZG_Success
forLoop:
let polyStatusFut = tp.blob_to_field_polynomial_parallel_async(polys[i].addr, blobs[i].addr)
let challengeStatusFut = tp.spawnAwaitable challenges[i].addr.fiatShamirChallenge(blobs[i].addr, commitments_bytes[i].addr)
let commitmentStatus = kzgifyStatus commitments[i].bytes_to_kzg_commitment(commitments_bytes[i])
if workerStatus == cttEthKZG_Success:
workerStatus = commitmentStatus
let polyStatus = kzgifyStatus sync(polyStatusFut)
if workerStatus == cttEthKZG_Success:
workerStatus = polyStatus
discard sync(challengeStatusFut)
# Lagrange Polynomial evaluation
# ------------------------------
# 1. Compute 1/(ωⁱ - z) with ω a root of unity, i in [0, N).
# zIndex = i if ωⁱ - z == 0 (it is the i-th root of unity) and -1 otherwise.
let zIndex = invRootsMinusZs[i].inverseRootsMinusZ_vartime(
ctx.domain, challenges[i],
earlyReturnOnZero = true)
# 2. Actual evaluation
if zIndex == -1:
var eval_at_challenge_fr{.noInit.}: Fr[BLS12_381]
tp.evalPolyAt_parallel(
eval_at_challenge_fr,
polys[i].addr, challenges[i].addr,
invRootsMinusZs[i].addr,
ctx.domain.addr)
evals_at_challenges[i].fromField(eval_at_challenge_fr)
else:
evals_at_challenges[i].fromField(polys[i].evals[zIndex])
let proofStatus = kzgifyStatus proofs[i].bytes_to_kzg_proof(proof_bytes[i])
if workerStatus == cttEthKZG_Success:
workerStatus = proofStatus
merge(remoteStatusFut: Flowvar[CttEthKzgStatus]):
let remoteStatus = sync(remoteStatusFut)
if workerStatus == cttEthKZG_Success:
workerStatus = remoteStatus
epilogue:
return workerStatus
result = sync(globalStatus)
if result != cttEthKZG_Success:
break HappyPath
var randomBlindingFr {.noInit.}: Fr[BLS12_381]
block blinding: # Ensure we don't multiply by 0 for blinding
# 1. Try with the random number supplied
for i in 0 ..< secureRandomBytes.len:
if secureRandomBytes[i] != byte 0:
randomBlindingFr.fromDigest(secureRandomBytes)
break blinding
# 2. If it's 0 (how?!), we just hash all the Fiat-Shamir challenges
var transcript: sha256
transcript.init()
transcript.update(RANDOM_CHALLENGE_KZG_BATCH_DOMAIN)
transcript.update(cast[ptr UncheckedArray[byte]](challenges).toOpenArray(0, n*sizeof(Fr[BLS12_381])-1))
var blindingBytes {.noInit.}: array[32, byte]
transcript.finish(blindingBytes)
randomBlindingFr.fromDigest(blindingBytes)
# TODO: use parallel prefix product for parallel powers compute
let linearIndepRandNumbers = allocHeapArrayAligned(Fr[BLS12_381], n, alignment = 64)
linearIndepRandNumbers.computePowers(n, randomBlindingFr)
type EcAffArray = ptr UncheckedArray[ECP_ShortW_Aff[Fp[BLS12_381], G1]]
let verif = kzg_verify_batch(
cast[EcAffArray](commitments),
challenges,
evals_at_challenges,
cast[EcAffArray](proofs),
linearIndepRandNumbers,
n,
ctx.srs_monomial_g2.coefs[1])
if verif:
result = cttEthKZG_Success
else:
result = cttEthKZG_VerificationFailure
freeHeapAligned(linearIndepRandNumbers)
freeHeapAligned(invRootsMinusZs)
freeHeapAligned(polys)
freeHeapAligned(proofs)
freeHeapAligned(evals_at_challenges)
freeHeapAligned(challenges)
freeHeapAligned(commitments)
return result

View File

@ -576,3 +576,14 @@ proc multiScalarMul_vartime_parallel*[bits: static int, EC, F, G](
let N = points.len
tp.multiScalarMul_dispatch_vartime_parallel(r.addr, coefs.asUnchecked(), points.asUnchecked(), N)
proc multiScalarMul_vartime_parallel*[bits: static int, EC, F, G](
tp: Threadpool,
r: ptr EC,
coefs: ptr UncheckedArray[BigInt[bits]],
points: ptr UncheckedArray[ECP_ShortW_Aff[F, G]],
len: int) {.meter, inline.} =
## Multiscalar multiplication:
## r <- [a₀]P₀ + [a₁]P₁ + ... + [aₙ]Pₙ
## This function can be nested in another parallel function
tp.multiScalarMul_dispatch_vartime_parallel(r, coefs, points, len)

View File

@ -371,13 +371,6 @@ func accum_half_vartime[F; G: static Subgroup](
# Batch addition - High-level
# ------------------------------------------------------------
template `+=`[F; G: static Subgroup](P: var ECP_ShortW_JacExt[F, G], Q: ECP_ShortW_Aff[F, G]) =
# All vartime procedures MUST be tagged vartime
# Hence we do not expose `+=` for extended jacobian operation to prevent `vartime` mistakes
# The following algorithms are all tagged vartime, hence for genericity
# we create a local `+=` for this module only
madd_vartime(P, P, Q)
func accumSum_chunk_vartime*[F; G: static Subgroup](
r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G] or ECP_ShortW_JacExt[F, G]),
points: ptr UncheckedArray[ECP_ShortW_Aff[F, G]], len: int) {.noInline, tags:[VarTime, Alloca].} =
@ -398,7 +391,7 @@ func accumSum_chunk_vartime*[F; G: static Subgroup](
while n >= minNumPointsSerial:
if (n and 1) == 1: # odd number of points
## Accumulate the last
r += points[n-1]
r.madd_vartime(r, points[n-1])
n -= 1
# Compute [0, n/2) += [n/2, n)
@ -409,7 +402,7 @@ func accumSum_chunk_vartime*[F; G: static Subgroup](
# Tail
for i in 0 ..< n:
r += points[i]
r.madd_vartime(r, points[i])
func accum_batch_vartime[F; G: static Subgroup](
r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G] or ECP_ShortW_JacExt[F, G]),
@ -472,36 +465,66 @@ func sum_reduce_vartime*[F; G: static Subgroup](
type EcAddAccumulator_vartime*[EC, F; G: static Subgroup; AccumMax: static int] = object
## Elliptic curve addition accumulator
## **Variable-Time**
# The `cur` is dereferenced first so better locality if at the beginning
# The `len` is dereferenced first so better locality if at the beginning
# Do we want alignment guarantees?
cur: uint32
len: uint32
accum: EC
buffer: array[AccumMax, ECP_ShortW_Aff[F, G]]
func init*(ctx: var EcAddAccumulator_vartime) =
static: doAssert EcAddAccumulator_vartime.AccumMax >= 16, "There is no point in a EcAddBatchAccumulator if the batch size is too small"
ctx.accum.setInf()
ctx.cur = 0
ctx.len = 0
func consumeBuffer[EC, F; G: static Subgroup; AccumMax: static int](
ctx: var EcAddAccumulator_vartime[EC, F, G, AccumMax]) {.noInline, tags: [VarTime, Alloca].}=
if ctx.cur == 0:
if ctx.len == 0:
return
ctx.accum.accumSum_chunk_vartime(ctx.buffer.asUnchecked(), ctx.cur)
ctx.cur = 0
ctx.accum.accumSum_chunk_vartime(ctx.buffer.asUnchecked(), ctx.len.int)
ctx.len = 0
func update*[EC, F, G; AccumMax: static int](
ctx: var EcAddAccumulator_vartime[EC, F, G, AccumMax],
P: ECP_ShortW_Aff[F, G]) =
if ctx.cur == AccumMax:
if P.isInf().bool:
return
if ctx.len == AccumMax:
ctx.consumeBuffer()
ctx.buffer[ctx.cur] = P
ctx.cur += 1
ctx.buffer[ctx.len] = P
ctx.len += 1
func handover*(ctx: var EcAddAccumulator_vartime) {.inline.} =
ctx.consumeBuffer()
func merge*[EC, F, G; AccumMax: static int](
ctxDst: var EcAddAccumulator_vartime[EC, F, G, AccumMax],
ctxSrc: EcAddAccumulator_vartime[EC, F, G, AccumMax]) =
var sCur = 0'u32
var itemsLeft = ctxSrc.len
if ctxDst.len + ctxSrc.len >= AccumMax:
# previous partial update, fill the buffer and do a batch addition
let free = AccumMax - ctxDst.len
for i in 0 ..< free:
ctxDst.buffer[ctxDst.len+i] = ctxSrc.buffer[i]
ctxDst.len = AccumMax
ctxDst.consumeBuffer()
sCur = free
itemsLeft -= free
# Store the tail
for i in 0 ..< itemsLeft:
ctxDst.buffer[ctxDst.len+i] = ctxSrc.buffer[sCur+i]
ctxDst.len += itemsLeft
ctxDst.accum.sum_vartime(ctxDst.accum, ctxSrc.accum)
# TODO: `merge` for parallel recursive divide-and-conquer processing
func finish*[EC, F, G; AccumMax: static int](
ctx: var EcAddAccumulator_vartime[EC, F, G, AccumMax],

View File

@ -30,19 +30,13 @@ proc sum_reduce_vartime_parallelChunks[F; G: static Subgroup](
points: openArray[ECP_ShortW_Aff[F, G]]) {.noInline.} =
## Batch addition of `points` into `r`
## `r` is overwritten
## Compute is parallelized, if beneficial.
## This function can be nested in another parallel function
## Scales better for large number of points
# Chunking constants in ec_shortweierstrass_batch_ops.nim
const maxTempMem = 262144 # 2¹⁸ = 262144
const maxChunkSize = maxTempMem div sizeof(ECP_ShortW_Aff[F, G])
const minChunkSize = (maxChunkSize * 60) div 100 # We want 60%~100% full chunks
if points.len <= maxChunkSize:
r.setInf()
r.accumSum_chunk_vartime(points.asUnchecked(), points.len)
return
let chunkDesc = balancedChunksPrioSize(
start = 0, stopEx = points.len,
minChunkSize, maxChunkSize,
@ -72,48 +66,58 @@ proc sum_reduce_vartime_parallelChunks[F; G: static Subgroup](
partialResultsAffine.batchAffine(partialResults, chunkDesc.numChunks)
r.sum_reduce_vartime(partialResultsAffine, chunkDesc.numChunks)
proc sum_reduce_vartime_parallelFor[F; G: static Subgroup](
proc sum_reduce_vartime_parallelAccums[F; G: static Subgroup](
tp: Threadpool,
r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G]),
points: openArray[ECP_ShortW_Aff[F, G]]) =
## Batch addition of `points` into `r`
## `r` is overwritten
## Compute is parallelized, if beneficial.
## 2x faster for low number of points
mixin globalSum
const maxTempMem = 1 shl 18 # 2¹⁸ = 262144
const maxChunkSize = maxTempMem div sizeof(ECP_ShortW_Aff[F, G])
type Acc = EcAddAccumulator_vartime[typeof(r), F, G, maxChunkSize]
const maxTempMem = 262144 # 2¹⁸ = 262144
const maxStride = maxTempMem div sizeof(ECP_ShortW_Aff[F, G])
let ps = points.asUnchecked()
let N = points.len
let p = points.asUnchecked
let pointsLen = points.len
mixin globalAcc
tp.parallelFor i in 0 ..< points.len:
stride: maxStride
captures: {p, pointsLen}
reduceInto(globalSum: typeof(r)):
const chunkSize = 32
tp.parallelFor i in 0 ..< N:
stride: chunkSize
captures: {ps, N}
reduceInto(globalAcc: ptr Acc):
prologue:
var localSum {.noInit.}: typeof(r)
localSum.setInf()
var workerAcc = allocHeap(Acc)
workerAcc[].init()
forLoop:
let n = min(maxStride, pointsLen-i)
localSum.accumSum_chunk_vartime(p +% i, n)
merge(remoteSum: Flowvar[typeof(r)]):
localSum.sum_vartime(localSum, sync(remoteSum))
for j in i ..< min(i+chunkSize, N):
workerAcc[].update(ps[j])
merge(remoteAccFut: Flowvar[ptr Acc]):
let remoteAcc = sync(remoteAccFut)
workerAcc[].merge(remoteAcc[])
freeHeap(remoteAcc)
epilogue:
return localSum
workerAcc[].handover()
return workerAcc
r = sync(globalSum)
let ctx = sync(globalAcc)
ctx[].finish(r)
freeHeap(ctx)
proc sum_reduce_vartime_parallel*[F; G: static Subgroup](
tp: Threadpool,
r: var (ECP_ShortW_Jac[F, G] or ECP_ShortW_Prj[F, G]),
points: openArray[ECP_ShortW_Aff[F, G]]) {.inline.} =
## Batch addition of `points` into `r`
## Parallel Batch addition of `points` into `r`
## `r` is overwritten
## Compute is parallelized, if beneficial.
## This function cannot be nested in another parallel function
when false:
tp.sum_reduce_vartime_parallelFor(r, points)
if points.len < 256:
r.setInf()
r.accumSum_chunk_vartime(points.asUnchecked(), points.len)
elif points.len < 8192:
tp.sum_reduce_vartime_parallelAccums(r, points)
else:
tp.sum_reduce_vartime_parallelChunks(r, points)

View File

@ -50,40 +50,40 @@ import
# and we can choose N to be way less than 68.
# So for compactness we take Aranha's approach.
const AccumMax = 8
const MillerAccumMax = 8
# Max buffer size before triggering a Miller Loop.
# Assuming pairing costs 100, with 50 for Miller Loop and 50 for Final exponentiation.
#
# N unbatched pairings would cost N*100
# N maximally batched pairings would cost N*50 + 50
# N AccumMax batched pairings would cost N*50 + N/AccumMax*(Fpᵏ mul) + 50
# N AccumMax batched pairings would cost N*50 + N/MillerAccumMax*(Fpᵏ mul) + 50
#
# Fpᵏ mul costs 0.7% of a Miller Loop and so is negligeable.
# By choosing AccumMax = 8, we amortized the cost to below 0.1% per pairing.
type MillerAccumulator*[FF1, FF2; FpK: ExtensionField] = object
accum: FpK
Ps: array[AccumMax, ECP_ShortW_Aff[FF1, G1]]
Qs: array[AccumMax, ECP_ShortW_Aff[FF2, G2]]
cur: uint32
Ps: array[MillerAccumMax, ECP_ShortW_Aff[FF1, G1]]
Qs: array[MillerAccumMax, ECP_ShortW_Aff[FF2, G2]]
len: uint32
accOnce: bool
func init*(ctx: var MillerAccumulator) =
ctx.cur = 0
ctx.len = 0
ctx.accOnce = false
func consumeBuffers[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK]) =
if ctx.cur == 0:
if ctx.len == 0:
return
var t{.noInit.}: FpK
t.millerLoop(ctx.Qs.asUnchecked(), ctx.Ps.asUnchecked(), ctx.cur.int)
t.millerLoop(ctx.Qs.asUnchecked(), ctx.Ps.asUnchecked(), ctx.len.int)
if ctx.accOnce:
ctx.accum *= t
else:
ctx.accum = t
ctx.accOnce = true
ctx.cur = 0
ctx.len = 0
func update*[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK], P: ECP_ShortW_Aff[FF1, G1], Q: ECP_ShortW_Aff[FF2, G2]): bool =
## Aggregate another set for pairing
@ -94,34 +94,54 @@ func update*[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK], P: ECP_Sh
if P.isInf().bool or Q.isInf().bool:
return false
if ctx.cur == AccumMax:
if ctx.len == MillerAccumMax:
ctx.consumeBuffers()
ctx.Ps[ctx.cur] = P
ctx.Qs[ctx.cur] = Q
ctx.cur += 1
ctx.Ps[ctx.len] = P
ctx.Qs[ctx.len] = Q
ctx.len += 1
return true
func handover*(ctx: var MillerAccumulator) {.inline.} =
## Prepare accumulator for cheaper merging.
##
## In a multi-threaded context, multiple accumulators can be created and process subsets of the batch in parallel.
## Accumulators can then be merged:
## merger_accumulator += mergee_accumulator
## Merging will involve an expensive reduction operation when an accumulation threshold of 8 is reached.
## However merging two reduced accumulators is 136x cheaper.
##
## `Handover` forces this reduction on local threads to limit the burden on the merger thread.
ctx.consumeBuffers()
func merge*(ctxDst: var MillerAccumulator, ctxSrc: MillerAccumulator) =
## Merge ctxDst <- ctxDst + ctxSrc
var dCur = ctxDst.cur
var sCur = 0'u
var itemsLeft = ctxSrc.cur
var itemsLeft = ctxSrc.len
if dCur != 0 and dCur+itemsLeft >= AccumMax:
if ctxDst.len + itemsLeft >= MillerAccumMax:
# Previous partial update, fill the buffer and do one miller loop
let free = AccumMax - dCur
let free = MillerAccumMax - ctxDst.len
for i in 0 ..< free:
ctxDst[dCur+i] = ctxSrc[i]
ctxDst.Ps[ctxDst.len+i] = ctxSrc.Ps[i]
ctxDst.Qs[ctxDst.len+i] = ctxSrc.Qs[i]
ctxDst.len = MillerAccumMax
ctxDst.consumeBuffers()
dCur = 0
sCur = free
itemsLeft -= free
if itemsLeft != 0:
# Store the tail
for i in 0 ..< itemsLeft:
ctxDst[dCur+i] = ctxSrc[sCur+i]
# Store the tail
for i in 0 ..< itemsLeft:
ctxDst.Ps[ctxDst.len+i] = ctxSrc.Ps[sCur+i]
ctxDst.Qs[ctxDst.len+i] = ctxSrc.Qs[sCur+i]
ctxDst.len += itemsLeft
if ctxDst.accOnce and ctxSrc.accOnce:
ctxDst.accum *= ctxSrc.accum
elif ctxSrc.accOnce:
ctxDst.accum = ctxSrc.accum
ctxDst.accOnce = true
func finish*[FF1, FF2, FpK](ctx: var MillerAccumulator[FF1, FF2, FpK], multiMillerLoopResult: var Fpk) =
## Output the accumulation of multiple Miller Loops

View File

@ -0,0 +1,156 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import ./polynomials {.all.}
export polynomials
import
../config/curves,
../arithmetic,
../../platforms/bithacks,
../../threadpool/threadpool
## ############################################################
##
## Polynomials
## Parallel Edition
##
## ############################################################
proc evalPolyAt_parallel*[N: static int, Field](
tp: Threadpool,
r: var Field,
poly: ptr PolynomialEval[N, Field],
z: ptr Field,
invRootsMinusZ: ptr array[N, Field],
domain: ptr PolyDomainEval[N, Field]) =
## Evaluate a polynomial in evaluation form
## at the point z
## z MUST NOT be one of the roots of unity
##
## Parallelism: This only returns when computation is fully done
# p(z) = (1-zⁿ)/n ∑ ωⁱ/(ωⁱ-z) . p(ωⁱ)
mixin globalSum
static: doAssert N.isPowerOf2_vartime()
tp.parallelFor i in 0 ..< N:
captures: {poly, domain, invRootsMinusZ}
reduceInto(globalSum: Field):
prologue:
var workerSum {.noInit.}: Field
workerSum.setZero()
forLoop:
var iterSummand {.noInit.}: Field
iterSummand.prod(domain.rootsOfUnity[i], invRootsMinusZ[i])
iterSummand *= poly.evals[i]
workerSum += iterSummand
merge(remoteSum: Flowvar[Field]):
workerSum += sync(remoteSum)
epilogue:
return workerSum
var t {.noInit.}: Field
t = z[]
const numDoublings = log2_vartime(uint32 N) # N is a power of 2
t.square_repeated(int numDoublings) # exponentiation by a power of 2
t.diff(Field(mres: Field.getMontyOne()), t) # TODO: refactor getMontyOne to getOne and return a field element.
r.prod(t, domain.invMaxDegree)
r *= sync(globalSum)
proc differenceQuotientEvalOffDomain_parallel*[N: static int, Field](
tp: Threadpool,
r: ptr PolynomialEval[N, Field],
poly: ptr PolynomialEval[N, Field],
pZ: ptr Field,
invRootsMinusZ: ptr array[N, Field]) =
## Compute r(x) = (p(x) - p(z)) / (x - z)
##
## for z != ωⁱ a power of a root of unity
##
## Input:
## - invRootsMinusZ: 1/(ωⁱ-z)
## - poly: p(x) a polynomial in evaluation form as an array of p(ωⁱ)
## - rootsOfUnity: ωⁱ
## - p(z)
##
## Parallelism: This only returns when computation is fully done
# TODO: we might want either awaitable for-loops
# or awaitable individual iterations
# for latency-hiding techniques
syncScope:
tp.parallelFor i in 0 ..< N:
captures: {r, poly, pZ, invRootsMinusZ}
# qᵢ = (p(ωⁱ) - p(z))/(ωⁱ-z)
var qi {.noinit.}: Field
qi.diff(poly.evals[i], pZ[])
r.evals[i].prod(qi, invRootsMinusZ[i])
proc differenceQuotientEvalInDomain_parallel*[N: static int, Field](
tp: Threadpool,
r: ptr PolynomialEval[N, Field],
poly: ptr PolynomialEval[N, Field],
zIndex: uint32,
invRootsMinusZ: ptr array[N, Field],
domain: ptr PolyDomainEval[N, Field],
isBitReversedDomain: static bool) =
## Compute r(x) = (p(x) - p(z)) / (x - z)
##
## for z = ωⁱ a power of a root of unity
##
## Input:
## - poly: p(x) a polynomial in evaluation form as an array of p(ωⁱ)
## - rootsOfUnity: ωⁱ
## - invRootsMinusZ: 1/(ωⁱ-z)
## - zIndex: the index of the root of unity power that matches z = ωⁱᵈˣ
##
## Parallelism: This only returns when computation is fully done
static:
# For powers of 2: x mod N == x and (N-1)
doAssert N.isPowerOf2_vartime()
mixin evalsZindex
tp.parallelFor i in 0 ..< N:
captures: {r, poly, domain, invRootsMinusZ, zIndex}
reduceInto(evalsZindex: Field):
prologue:
var worker_ri {.noInit.}: Field
worker_ri.setZero()
forLoop:
var iter_ri {.noInit.}: Field
if i == int(zIndex):
iter_ri.setZero()
else:
# qᵢ = (p(ωⁱ) - p(z))/(ωⁱ-z)
var qi {.noinit.}: Field
qi.diff(poly.evals[i], poly.evals[zIndex])
r.evals[i].prod(qi, invRootsMinusZ[i])
# q'ᵢ = -qᵢ * ωⁱ/z
# q'idx = ∑ q'ᵢ
iter_ri.neg(r.evals[i]) # -qᵢ
when isBitReversedDomain:
const logN = log2_vartime(uint32 N)
let invZidx = N - reverseBits(uint32 zIndex, logN)
let canonI = reverseBits(uint32 i, logN)
let idx = reverseBits((canonI + invZidx) and (N-1), logN)
iter_ri *= domain.rootsOfUnity[idx] # -qᵢ * ωⁱ/z (explanation at the bottom of serial impl)
else:
iter_ri *= domain.rootsOfUnity[(i+N-zIndex) and (N-1)] # -qᵢ * ωⁱ/z (explanation at the bottom of serial impl)
worker_ri += iter_ri
merge(remote_ri: Flowvar[Field]):
worker_ri += sync(remote_ri)
epilogue:
return worker_ri
r.evals[zIndex] = sync(evalsZindex)

View File

@ -6,7 +6,9 @@
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import std/macros
import
std/macros,
./primitives
# OpenArray type
# ---------------------------------------------------------
@ -29,12 +31,23 @@ type View*[T] = object
template toOpenArray*[T](v: View[T]): openArray[T] =
v.data.toOpenArray(0, v.len-1)
func toView*[T](data: ptr UncheckedArray[T], len: int) {.inline.} =
func toView*[T](oa: openArray[T]): View[T] {.inline.} =
View[T](data: cast[ptr UncheckedArray[T]](oa[0].unsafeAddr), len: oa.len)
func toView*[T](data: ptr UncheckedArray[T], len: int): View[T] {.inline.} =
View[T](data: data, len: len)
func `[]`*[T](v: View[T], idx: int): lent T {.inline.} =
v.data[idx]
func chunk*[T](v: View[T], start, len: int): View[T] {.inline.} =
## Create a sub-chunk from a view
debug:
doAssert start >= 0
doAssert start + len <= v.len
result.data = v.data +% start
result.len = len
type MutableView*[T] {.borrow: `.`.} = distinct View[T]
template toOpenArray*[T](v: MutableView[T]): openArray[T] =

View File

@ -198,7 +198,7 @@ func update*[Pubkey: ECP_ShortW_Aff](
augmentation = "", message,
ctx.domainSepTag.toOpenArray(0, ctx.dst_len.int - 1))
ctx.millerAccum.update(pubkey, hmsgG2_aff)
return ctx.millerAccum.update(pubkey, hmsgG2_aff)
else:
# Pubkey on G2, H(message) and Signature on G1
@ -209,7 +209,7 @@ func update*[Pubkey: ECP_ShortW_Aff](
augmentation = "", message,
ctx.domainSepTag.toOpenArray(0, ctx.dst_len.int - 1))
ctx.millerAccum.update(hmsgG1_aff, pubkey)
return ctx.millerAccum.update(hmsgG1_aff, pubkey)
func update*[Pubkey: ECP_ShortW_Aff](
ctx: var BLSAggregateSigAccumulator,
@ -227,6 +227,7 @@ func merge*(ctxDst: var BLSAggregateSigAccumulator, ctxSrc: BLSAggregateSigAccum
return false
ctxDst.millerAccum.merge(ctxSrc.millerAccum)
return true
func finalVerify*[F, G](ctx: var BLSAggregateSigAccumulator, aggregateSignature: ECP_ShortW_Aff[F, G]): bool =
## Finish batch and/or aggregate signature verification and returns the final result.
@ -439,7 +440,7 @@ func update*[Pubkey, Sig: ECP_ShortW_Aff](
augmentation = "", message,
ctx.domainSepTag.toOpenArray(0, ctx.dst_len.int - 1))
ctx.millerAccum.update(pkG1_aff, hmsgG2_aff)
return ctx.millerAccum.update(pkG1_aff, hmsgG2_aff)
else:
# Pubkey on G2, H(message) and Signature on G1
@ -467,7 +468,7 @@ func update*[Pubkey, Sig: ECP_ShortW_Aff](
type FF1 = BLSBatchSigAccumulator.FF1
var hmsgG1_aff {.noInit.}: ECP_ShortW_Aff[FF1, G1]
hmsgG1_aff.affine(hmsgG1_jac)
ctx.millerAccum.update(hmsgG1_aff, pubkey)
return ctx.millerAccum.update(hmsgG1_aff, pubkey)
func update*[Pubkey, Sig: ECP_ShortW_Aff](
ctx: var BLSBatchSigAccumulator,
@ -476,13 +477,25 @@ func update*[Pubkey, Sig: ECP_ShortW_Aff](
signature: Sig): bool {.inline.} =
ctx.update(pubkey, message, signature)
func handover*(ctx: var BLSBatchSigAccumulator) {.inline.} =
## Prepare accumulator for cheaper merging.
##
## In a multi-threaded context, multiple accumulators can be created and process subsets of the batch in parallel.
## Accumulators can then be merged:
## merger_accumulator += mergee_accumulator
## Merging will involve an expensive reduction operation when an accumulation threshold of 8 is reached.
## However merging two reduced accumulators is 136x cheaper.
##
## `Handover` forces this reduction on local threads to limit the burden on the merger thread.
ctx.millerAccum.handover()
func merge*(ctxDst: var BLSBatchSigAccumulator, ctxSrc: BLSBatchSigAccumulator): bool =
## Merge 2 BLS signature accumulators: ctxDst <- ctxDst + ctxSrc
##
## Returns false if they have inconsistent DomainSeparationTag and true otherwise.
if ctxDst.dst_len != ctxSrc.dst_len:
return false
if not equalMem(ctxDst.domainSepTag.addr, ctxSrc.domainSepTag.addr, ctxDst.domainSepTag.len):
if not equalMem(ctxDst.domainSepTag.addr, ctxSrc.domainSepTag.unsafeAddr, ctxDst.domainSepTag.len):
return false
ctxDst.millerAccum.merge(ctxSrc.millerAccum)
@ -494,6 +507,7 @@ func merge*(ctxDst: var BLSBatchSigAccumulator, ctxSrc: BLSBatchSigAccumulator):
ctxDst.aggSigOnce = true
BLSBatchSigAccumulator.H.hash(ctxDst.secureBlinding, ctxDst.secureBlinding, ctxSrc.secureBlinding)
return true
func finalVerify*(ctx: var BLSBatchSigAccumulator): bool =
## Finish batch and/or aggregate signature verification and returns the final result.

View File

@ -0,0 +1,181 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
# ############################################################
#
# BLS Signatures
# Parallel edition
#
# ############################################################
when not compileOption("threads"):
{.error: "This requires --threads:on compilation flag".}
# Import all bls_signature including private fields and reexport
import ./bls_signatures{.all.}
export bls_signatures
import
# Standard library
std/atomics,
# Constantine
../threadpool/[threadpool, partitioners],
../platforms/[abstractions, allocs, views],
../serialization/endians,
../hashes,
../math/ec_shortweierstrass
# No exceptions allowed in core cryptographic operations
{.push raises: [].}
{.push checks: off.}
# Parallelized Batch Verifier
# ----------------------------------------------------------------------
# Parallel pairing computation requires the following steps
#
# Assuming we have N (public key, message, signature) triplets to verify
# on P processor/threads.
# We want B batches with B = (idle) P
# Each processing W work items with W = N/B or N/B + 1
#
# Step 0: Initialize an accumulator per thread.
# Step 1: Compute partial pairings, W work items per thread. (~190μs - Miller loops)
# Step 2: Merge the B partial pairings (~1.3μs - Fp12 multiplications)
# Step 4: Final verification (~233μs - Final Exponentiation)
#
# (Timings are per operation on a 2.6GHz, turbo 5Ghz i9-11980HK CPU for BLS12-381 pairings.)
#
# We rely on the lazy tree splitting
# of Constantine's threadpool to only split computation if there is an idle worker.
# We force the base case for splitting to be 2 for efficiency but
# the actual base case auto-adapts to runtime conditions
# and may be 100 for example if all other threads are busy.
#
# In Ethereum consensus, blocks may require up to 6 verifications:
# - block proposals signatures
# - randao reveal signatures
# - proposer slashings signatures
# - attester slashings signatures
# - attestations signatures
# - validator exits signatures
# not counting deposits signatures which may be invalid
#
# And signature verification is the bottleneck for fast syncing and may reduce sync speed
# by hours or days.
proc batchVerify_parallel*[Msg, Pubkey, Sig](
tp: Threadpool,
pubkeys: openArray[Pubkey],
messages: openArray[Msg],
signatures: openArray[Sig],
H: type CryptoHash,
k: static int,
domainSepTag: openArray[byte],
secureRandomBytes: array[32, byte]): bool {.noInline, genCharAPI.} =
## Verify that all (pubkey, message, signature) triplets are valid
##
## Returns false if there is at least one incorrect signature
##
## Assumes pubkeys and signatures have been checked for non-infinity and group-checked.
##
## This requires cryptographically-secure generated random bytes
## for scalar blinding
## to defend against forged signatures that would not
## verify individually but would verify while aggregated.
## I.e. we need an input that is not under the attacker control.
##
## The blinding scheme also assumes that the attacker cannot
## resubmit 2^64 times forged (publickey, message, signature) triplets
## against the same `secureRandomBytes`
if tp.numThreads == 1:
return batchVerify(pubkeys, messages, signatures, H, k, domainSepTag, secureRandomBytes)
if pubkeys.len == 0:
return false
if pubkeys.len != messages.len or pubkeys.len != signatures.len:
return false
type FF1 = Pubkey.F
type FF2 = Sig.F
type FpK = Sig.F.C.getGT()
# Stage 0a: Setup per-thread accumulators
debug: doAssert pubkeys.len <= 1 shl 32
let N = pubkeys.len.uint32
let numAccums = min(N, tp.numThreads.uint32)
let accums = allocHeapArray(BLSBatchSigAccumulator[H, FF1, FF2, Fpk, ECP_ShortW_Jac[Sig.F, Sig.G], k], numAccums)
# Stage 0b: Setup synchronization
var currentItem {.noInit.}: Atomic[uint32]
var terminateSignal {.noInit.}: Atomic[bool]
currentItem.store(0, moRelaxed)
terminateSignal.store(false, moRelaxed)
# Stage 1: Accumulate partial pairings (Miller Loops)
# ---------------------------------------------------
proc accumulate(
ctx: ptr BLSBatchSigAccumulator,
pubkeys: ptr UncheckedArray[Pubkey],
messages: ptr UncheckedArray[Msg],
signatures: ptr UncheckedArray[Sig],
N: uint32,
domainSepTag: View[byte],
secureRandomBytes: ptr array[32, byte],
accumSepTag: array[sizeof(int), byte],
terminateSignal: ptr Atomic[bool],
currentItem: ptr Atomic[uint32]): bool {.nimcall, gcsafe.} =
ctx[].init(
domainSepTag.toOpenArray(),
secureRandomBytes[],
accumSepTag)
while not terminateSignal[].load(moRelaxed):
let i = currentItem[].fetchAdd(1, moRelaxed)
if i >= N:
break
if not ctx[].update(pubkeys[i], messages[i], signatures[i]):
terminateSignal[].store(true, moRelaxed)
return false
ctx[].handover()
return true
# Stage 2: Schedule work
# ---------------------------------------------------
let partialStates = allocStackArray(Flowvar[bool], numAccums)
for id in 0 ..< numAccums:
partialStates[id] = tp.spawn accumulate(
accums[id].addr,
pubkeys.asUnchecked(),
messages.asUnchecked(),
signatures.asUnchecked(),
N,
domainSepTag.toView(),
secureRandomBytes.unsafeAddr,
id.uint.toBytes(bigEndian),
terminateSignal.addr,
currentItem.addr)
# Stage 3: Reduce partial pairings
# --------------------------------
# Linear merge with latency hiding, we could consider a parallel logarithmic merge via a binary tree merge / divide-and-conquer
block HappyPath: # sync must be called even if result is false in the middle to avoid tasks leaking
result = sync partialStates[0]
for i in 1 ..< numAccums:
result = result and sync partialStates[i]
if result: # As long as no error is returned, accumulate
result = result and accums[0].merge(accums[i])
if not result: # Don't proceed to final exponentiation if there is already an error
break HappyPath
result = accums[0].finalVerify()
freeHeap(accums)

View File

@ -337,7 +337,7 @@ proc main() =
flt = ru.ru_minflt
let start = wtime_msec()
var tp = Threadpool.new(numThreads = nthreads)
let tp = Threadpool.new(numThreads = nthreads)
tp.blackScholesConstantine(ctx.addr)
tp.shutdown()

View File

@ -223,7 +223,7 @@ when compileOption("threads"):
# We need the buffer raw address
let buf = cast[ptr UncheckedArray[Vec]](C[0].addr)
var tp = Threadpool.new()
let tp = Threadpool.new()
tp.parallelFor y in 0 ..< h: # Loop over image rows
captures: {tp, buf, samples}
@ -269,7 +269,7 @@ when compileOption("threads"):
# We need the buffer raw address
let buf = cast[ptr UncheckedArray[Vec]](C[0].addr)
var tp = Threadpool.new()
let tp = Threadpool.new()
tp.parallelFor y in 0 ..< h: # Loop over image rows
captures: {buf, samples}

View File

@ -16,7 +16,7 @@ block: # Async without result
echo "\nSanity check 1: Printing 123456 654321 in parallel"
var tp = Threadpool.new(numThreads = 4)
let tp = Threadpool.new(numThreads = 4)
tp.spawn displayInt(123456)
tp.spawn displayInt(654321)
tp.shutdown()

View File

@ -28,7 +28,7 @@ proc main() =
var n = 1_000_000
var nthreads = countProcessors()
var tp = Threadpool.new(num_threads = nthreads) # Default to the number of hardware threads.
let tp = Threadpool.new(num_threads = nthreads) # Default to the number of hardware threads.
echo formatFloat(tp.piApprox(n))

View File

@ -6,7 +6,7 @@ block:
echo "Running 'threadpool/examples/e03_parallel_for.nim'"
echo "=============================================================================================="
var tp = Threadpool.new(numThreads = 4)
let tp = Threadpool.new(numThreads = 4)
tp.parallelFor i in 0 ..< 100:
log("%d\n", i)
@ -24,7 +24,7 @@ block: # Capturing outside scope
echo "Running 'threadpool/examples/e03_parallel_for.nim'"
echo "=============================================================================================="
var tp = Threadpool.new(numThreads = 4)
let tp = Threadpool.new(numThreads = 4)
var a = 100
var b = 10
@ -45,7 +45,7 @@ block: # Nested loops
echo "Running 'threadpool/examples/e03_parallel_for.nim'"
echo "=============================================================================================="
var tp = Threadpool.new(numThreads = 4)
let tp = Threadpool.new(numThreads = 4)
tp.parallelFor i in 0 ..< 4:
tp.parallelFor j in 0 ..< 8:

View File

@ -20,7 +20,7 @@ block:
result = sync(globalSum)
var tp = Threadpool.new(numThreads = 4)
let tp = Threadpool.new(numThreads = 4)
let sum1M = tp.sumReduce(1000000)
echo "Sum reduce(0..1000000): ", sum1M

View File

@ -38,7 +38,7 @@ import
proc needTempStorage(argTy: NimNode): bool =
case argTy.kind
of nnkVarTy:
error("It is unsafe to capture a `var` parameter and pass it to another thread. Its memory location could be invalidated if the spawning proc returns before the worker thread finishes.")
error("It is unsafe to capture a `var` parameter '" & repr(argTy) & "' and pass it to another thread. Its memory location could be invalidated if the spawning proc returns before the worker thread finishes.")
of nnkStaticTy:
return false
of nnkBracketExpr:

View File

@ -950,7 +950,7 @@ proc new*(T: type Threadpool, numThreads = countProcessors()): T {.raises: [Reso
## will not impact correctness but may impact performance.
type TpObj = typeof(default(Threadpool)[]) # due to C import, we need a dynamic sizeof
var tp = allocHeapUncheckedAlignedPtr(Threadpool, sizeof(TpObj), alignment = 64)
let tp = allocHeapUncheckedAlignedPtr(Threadpool, sizeof(TpObj), alignment = 64)
tp.barrier.init(numThreads.uint32)
tp.globalBackoff.initialize()
@ -978,7 +978,7 @@ proc new*(T: type Threadpool, numThreads = countProcessors()): T {.raises: [Reso
profileStart(run_task)
return tp
proc cleanup(tp: var Threadpool) {.raises: [].} =
proc cleanup(tp: Threadpool) {.raises: [].} =
## Cleanup all resources allocated by the threadpool
preCondition: workerContext.currentTask.isRootTask()
@ -993,7 +993,7 @@ proc cleanup(tp: var Threadpool) {.raises: [].} =
tp.freeHeapAligned()
proc shutdown*(tp: var Threadpool) {.raises:[].} =
proc shutdown*(tp: Threadpool) {.raises:[].} =
## Wait until all tasks are processed and then shutdown the threadpool
preCondition: workerContext.currentTask.isRootTask()
tp.syncAll()

View File

@ -83,7 +83,7 @@ proc run_EC_batch_add_parallel_impl*[N: static int](
for n in numPoints:
test $ec & " parallel sum reduction (N=" & $n & ")":
proc test(EC: typedesc, gen: RandomGen) =
var tp = Threadpool.new()
let tp = Threadpool.new()
defer: tp.shutdown()
var points = newSeq[ECP_ShortW_Aff[EC.F, EC.G]](n)
@ -108,7 +108,7 @@ proc run_EC_batch_add_parallel_impl*[N: static int](
test "EC " & $ec.G & " parallel sum reduction (N=" & $n & ") - special cases":
proc test(EC: typedesc, gen: RandomGen) =
var tp = Threadpool.new()
let tp = Threadpool.new()
defer: tp.shutdown()
var points = newSeq[ECP_ShortW_Aff[EC.F, EC.G]](n)
@ -162,7 +162,7 @@ proc run_EC_multi_scalar_mul_parallel_impl*[N: static int](
let bucketBits = bestBucketBitSize(n, ec.F.C.getCurveOrderBitwidth(), useSignedBuckets = false, useManualTuning = false)
test $ec & " Parallel Multi-scalar-mul (N=" & $n & ", bucket bits: " & $bucketBits & ")":
proc test(EC: typedesc, gen: RandomGen) =
var tp = Threadpool.new()
let tp = Threadpool.new()
defer: tp.shutdown()
var points = newSeq[ECP_ShortW_Aff[EC.F, EC.G]](n)
var coefs = newSeq[BigInt[EC.F.C.getCurveOrderBitwidth()]](n)

View File

@ -9,9 +9,10 @@
import
std/[os, unittest, strutils],
pkg/jsony,
../constantine/ethereum_bls_signatures,
../constantine/ethereum_bls_signatures_parallel,
../constantine/serialization/codecs,
../constantine/hashes
../constantine/hashes,
../constantine/threadpool/threadpool
type
# https://github.com/ethereum/bls12-381-tests/blob/master/formats/
@ -301,6 +302,13 @@ testGen(batch_verify, testVector, BatchVerify_test):
status[0] = pubkeys.batch_verify(testVector.input.messages, signatures, randomBytes)
let tp = Threadpool.new(numThreads = 4)
let parallelStatus = tp.batch_verify_parallel(pubkeys, testVector.input.messages, signatures, randomBytes)
doAssert status[0] == parallelStatus, block:
"\nSerial status: " & $status[0] &
"\nParallel status: " & $parallelStatus & '\n'
tp.shutdown()
let success = status == (cttBLS_Success, cttCodecEcc_Success)
doAssert success == testVector.output, block:
"Verification differs from expected \n" &

View File

@ -0,0 +1 @@
--threads:on

View File

@ -14,7 +14,7 @@ import
# Internals
../constantine/hashes,
../constantine/serialization/codecs,
../constantine/ethereum_eip4844_kzg_polynomial_commitments
../constantine/ethereum_eip4844_kzg
# Organization
#

View File

@ -0,0 +1,274 @@
# Constantine
# Copyright (c) 2018-2019 Status Research & Development GmbH
# Copyright (c) 2020-Present Mamy André-Ratsimbazafy
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
# Standard library
std/[os, strutils, streams, unittest],
# 3rd party
pkg/yaml,
# Internals
../constantine/hashes,
../constantine/serialization/codecs,
../constantine/ethereum_eip4844_kzg_parallel,
../constantine/threadpool/threadpool
# Organization
#
# We choose not to use a type schema here, unlike with the other json-based tests
# like:
# - t_ethereum_bls_signatures
# - t_ethereum_evem_precompiles
#
# They'll add a lot of verbosity due to all the KZG types
# and failure modes (subgroups, ...)
# https://nimyaml.org/serialization.html
const
TestVectorsDir =
currentSourcePath.rsplit(DirSep, 1)[0] / "protocol_ethereum_eip4844_deneb_kzg"
const SkippedTests = [
""
]
iterator walkTests*(testDir: string, skipped: var int): (string, string) =
for file in walkDirRec(testDir, relative = true):
if file in SkippedTests:
echo "[WARNING] Skipping - ", file
inc skipped
continue
yield (testDir, file)
proc loadVectors(filename: string): YamlNode =
var s = filename.openFileStream()
defer: s.close()
load(s, result)
template testGen*(name, testData: untyped, body: untyped): untyped {.dirty.} =
## Generates a test proc
## with identifier "test_name"
## The test vector data is available as JsonNode under the
## the variable passed as `testData`
proc `test _ name`(tp: Threadpool, ctx: ptr EthereumKZGContext) =
var count = 0 # Need to fail if walkDir doesn't return anything
var skipped = 0
const testdir = TestVectorsDir / astToStr(name)/"small"
for dir, file in walkTests(testdir, skipped):
stdout.write(" " & alignLeft(astToStr(name) & " test:", 36) & alignLeft(file, 90))
let testData = loadVectors(dir/file)
body
inc count
doAssert count > 0, "Empty or inexisting test folder: " & astToStr(name)
if skipped > 0:
echo "[Warning]: ", skipped, " tests skipped."
template parseAssign(dstVariable: untyped, size: static int, hexInput: string) =
block:
let prefixBytes = 2*int(hexInput.startsWith("0x"))
let expectedLength = size*2 + prefixBytes
if hexInput.len != expectedLength:
let encodedBytes = (hexInput.len - prefixBytes) div 2
stdout.write "[ Incorrect input length for '" &
astToStr(dstVariable) &
"': encoding " & $encodedBytes & " bytes" &
" instead of expected " & $size & " ]\n"
doAssert testVector["output"].content == "null"
# We're in a template, this shortcuts the caller `walkTests`
continue
var dstVariable{.inject.} = new(array[size, byte])
dstVariable[].fromHex(hexInput)
template parseAssignList(dstVariable: untyped, elemSize: static int, hexListInput: YamlNode) =
var dstVariable{.inject.} = newSeq[array[elemSize, byte]]()
block exitHappyPath:
block exitException:
for elem in hexListInput:
let hexInput = elem.content
let prefixBytes = 2*int(hexInput.startsWith("0x"))
let expectedLength = elemSize*2 + prefixBytes
if hexInput.len != expectedLength:
let encodedBytes = (hexInput.len - prefixBytes) div 2
stdout.write "[ Incorrect input length for '" &
astToStr(dstVariable) &
"': encoding " & $encodedBytes & " bytes" &
" instead of expected " & $elemSize & " ]\n"
doAssert testVector["output"].content == "null"
break exitException
else:
dstVariable.setLen(dstVariable.len + 1)
dstVariable[^1].fromHex(hexInput)
break exitHappyPath
# We're in a template, this shortcuts the caller `walkTests`
continue
testGen(blob_to_kzg_commitment, testVector):
parseAssign(blob, 32*4096, testVector["input"]["blob"].content)
var commitment: array[48, byte]
let status = tp.blob_to_kzg_commitment_parallel(ctx, commitment, blob[].addr)
stdout.write "[" & $status & "]\n"
if status == cttEthKZG_Success:
parseAssign(expectedCommit, 48, testVector["output"].content)
doAssert bool(commitment == expectedCommit[]), block:
"\ncommitment: " & commitment.toHex() &
"\nexpected: " & expectedCommit[].toHex() & "\n"
else:
doAssert testVector["output"].content == "null"
testGen(compute_kzg_proof, testVector):
parseAssign(blob, 32*4096, testVector["input"]["blob"].content)
parseAssign(z, 32, testVector["input"]["z"].content)
var proof: array[48, byte]
var y: array[32, byte]
let status = tp.compute_kzg_proof_parallel(ctx, proof, y, blob[].addr, z[])
stdout.write "[" & $status & "]\n"
if status == cttEthKZG_Success:
parseAssign(expectedEvalAtChallenge, 32, testVector["output"][1].content)
parseAssign(expectedProof, 48, testVector["output"][0].content)
doAssert bool(y == expectedEvalAtChallenge[]), block:
"\ny (= p(z)): " & y.toHex() &
"\nexpected: " & expectedEvalAtChallenge[].toHex() & "\n"
doAssert bool(proof == expectedProof[]), block:
"\nproof: " & proof.toHex() &
"\nexpected: " & expectedProof[].toHex() & "\n"
else:
doAssert testVector["output"].content == "null"
testGen(verify_kzg_proof, testVector):
parseAssign(commitment, 48, testVector["input"]["commitment"].content)
parseAssign(z, 32, testVector["input"]["z"].content)
parseAssign(y, 32, testVector["input"]["y"].content)
parseAssign(proof, 48, testVector["input"]["proof"].content)
let status = verify_kzg_proof(ctx, commitment[], z[], y[], proof[])
stdout.write "[" & $status & "]\n"
if status == cttEthKZG_Success:
doAssert testVector["output"].content == "true"
elif status == cttEthKZG_VerificationFailure:
doAssert testVector["output"].content == "false"
else:
doAssert testVector["output"].content == "null"
testGen(compute_blob_kzg_proof, testVector):
parseAssign(blob, 32*4096, testVector["input"]["blob"].content)
parseAssign(commitment, 48, testVector["input"]["commitment"].content)
var proof: array[48, byte]
let status = tp.compute_blob_kzg_proof_parallel(ctx, proof, blob[].addr, commitment[])
stdout.write "[" & $status & "]\n"
if status == cttEthKZG_Success:
parseAssign(expectedProof, 48, testVector["output"].content)
doAssert bool(proof == expectedProof[]), block:
"\nproof: " & proof.toHex() &
"\nexpected: " & expectedProof[].toHex() & "\n"
else:
doAssert testVector["output"].content == "null"
testGen(verify_blob_kzg_proof, testVector):
parseAssign(blob, 32*4096, testVector["input"]["blob"].content)
parseAssign(commitment, 48, testVector["input"]["commitment"].content)
parseAssign(proof, 48, testVector["input"]["proof"].content)
let status = tp.verify_blob_kzg_proof_parallel(ctx, blob[].addr, commitment[], proof[])
stdout.write "[" & $status & "]\n"
if status == cttEthKZG_Success:
doAssert testVector["output"].content == "true"
elif status == cttEthKZG_VerificationFailure:
doAssert testVector["output"].content == "false"
else:
doAssert testVector["output"].content == "null"
testGen(verify_blob_kzg_proof_batch, testVector):
parseAssignList(blobs, 32*4096, testVector["input"]["blobs"])
parseAssignList(commitments, 48, testVector["input"]["commitments"])
parseAssignList(proofs, 48, testVector["input"]["proofs"])
if blobs.len != commitments.len:
stdout.write "[ Length mismatch between blobs and commitments ]\n"
doAssert testVector["output"].content == "null"
continue
if blobs.len != proofs.len:
stdout.write "[ Length mismatch between blobs and proofs ]\n"
doAssert testVector["output"].content == "null"
continue
# For reproducibility/debugging we don't use the CSPRNG here
var randomBlinding {.noInit.}: array[32, byte]
sha256.hash(randomBlinding, "The wizard quickly jinxed the gnomes before they vaporized.")
template asUnchecked[T](a: openArray[T]): ptr UncheckedArray[T] =
if a.len > 0:
cast[ptr UncheckedArray[T]](a[0].unsafeAddr)
else:
nil
let status = tp.verify_blob_kzg_proof_batch_parallel(
ctx,
blobs.asUnchecked(),
commitments.asUnchecked(),
proofs.asUnchecked(),
blobs.len,
randomBlinding)
stdout.write "[" & $status & "]\n"
if status == cttEthKZG_Success:
doAssert testVector["output"].content == "true"
elif status == cttEthKZG_VerificationFailure:
doAssert testVector["output"].content == "false"
else:
doAssert testVector["output"].content == "null"
block:
suite "Ethereum Deneb Hardfork / EIP-4844 / Proto-Danksharding / KZG Polynomial Commitments (Parallel)":
let ctx = load_ethereum_kzg_test_trusted_setup_mainnet()
let tp = Threadpool.new()
test "blob_to_kzg_commitment_parallel(tp: Threadpool, dst: var array[48, byte], blob: ptr array[4096, byte])":
test_blob_to_kzg_commitment(tp, ctx)
test "compute_kzg_proof_parallel(tp: Threadpool, proof: var array[48, byte], y: var array[32, byte], blob: ptr array[4096, byte], z: array[32, byte])":
test_compute_kzg_proof(tp, ctx)
# Not parallelized
# test "verify_kzg_proof(commitment: array[48, byte], z, y: array[32, byte], proof: array[48, byte]) -> bool":
# test_verify_kzg_proof(tp, ctx)
test "compute_blob_kzg_proof_parallel(tp: Threadpool, proof: var array[48, byte], blob: ptr array[4096, byte], commitment: array[48, byte])":
test_compute_blob_kzg_proof(tp, ctx)
test "verify_blob_kzg_proof_parallel(tp: Threadpool, blob: ptr array[4096, byte], commitment, proof: array[48, byte])":
test_verify_blob_kzg_proof(tp, ctx)
test "verify_blob_kzg_proof_batch_parallel(tp: Threadpool, blobs: ptr UncheckedArray[array[4096, byte]], commitments, proofs: ptr UncheckedArray[array[48, byte]], n: int, secureRandomBytes: array[32, byte])":
test_verify_blob_kzg_proof_batch(tp, ctx)
tp.shutdown()
ctx.delete()

View File

@ -0,0 +1,3 @@
# NimYAML requires ORC instead of ARC for memory management to deal with cycles
--mm:orc
--threads:on