Speed up altair block processing 2x (#3115)

* Speed up altair block processing >2x

Like #3089, this PR drastially speeds up historical REST queries and
other long state replays.

* cache sync committee validator indices
* use ~80mb less memory for validator pubkey mappings
* batch-verify sync aggregate signature (fixes #2985)
* document sync committee hack with head block vs sync message block
* add batch signature verification failure tests

Before:

```
../env.sh nim c -d:release -r ncli_db --db:mainnet_0/db bench --start-slot:-1000
All time are ms
     Average,       StdDev,          Min,          Max,      Samples,         Test
Validation is turned off meaning that no BLS operations are performed
    5830.675,        0.000,     5830.675,     5830.675,            1, Initialize DB
       0.481,        1.878,        0.215,       59.167,          981, Load block from database
    8422.566,        0.000,     8422.566,     8422.566,            1, Load state from database
       6.996,        1.678,        0.042,       14.385,          969, Advance slot, non-epoch
      93.217,        8.318,       84.192,      122.209,           32, Advance slot, epoch
      20.513,       23.665,       11.510,      201.561,          981, Apply block, no slot processing
       0.000,        0.000,        0.000,        0.000,            0, Database load
       0.000,        0.000,        0.000,        0.000,            0, Database store
```

After:

```
    7081.422,        0.000,     7081.422,     7081.422,            1, Initialize DB
       0.553,        2.122,        0.175,       66.692,          981, Load block from database
    5439.446,        0.000,     5439.446,     5439.446,            1, Load state from database
       6.829,        1.575,        0.043,       12.156,          969, Advance slot, non-epoch
      94.716,        2.749,       88.395,      100.026,           32, Advance slot, epoch
      11.636,       23.766,        4.889,      205.250,          981, Apply block, no slot processing
       0.000,        0.000,        0.000,        0.000,            0, Database load
       0.000,        0.000,        0.000,        0.000,            0, Database store
```

* add comment
This commit is contained in:
Jacek Sieka 2021-11-24 13:43:50 +01:00 committed by GitHub
parent 88c623e250
commit 9c2f43ed0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 252 additions and 88 deletions

View File

@ -71,6 +71,11 @@ OK: 16/16 Fail: 0/16 Skip: 0/16
+ latest_block_root OK + latest_block_root OK
``` ```
OK: 3/3 Fail: 0/3 Skip: 0/3 OK: 3/3 Fail: 0/3 Skip: 0/3
## Block pool altair processing [Preset: mainnet]
```diff
+ Invalid signatures [Preset: mainnet] OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Block pool processing [Preset: mainnet] ## Block pool processing [Preset: mainnet]
```diff ```diff
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK + Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
@ -165,7 +170,7 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
## Gossip validation [Preset: mainnet] ## Gossip validation [Preset: mainnet]
```diff ```diff
+ Any committee index is valid OK + Any committee index is valid OK
+ Validation sanity OK + validateAttestation OK
``` ```
OK: 2/2 Fail: 0/2 Skip: 0/2 OK: 2/2 Fail: 0/2 Skip: 0/2
## Gossip validation - Extra ## Gossip validation - Extra
@ -366,4 +371,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL--- ---TOTAL---
OK: 206/208 Fail: 0/208 Skip: 2/208 OK: 207/209 Fail: 0/209 Skip: 2/209

View File

@ -198,6 +198,12 @@ type
onFinHappened*: OnFinalizedCallback onFinHappened*: OnFinalizedCallback
## On finalization callback ## On finalization callback
headSyncCommittees*: SyncCommitteeCache ##\
## A cache of the sync committees, as they appear in the head state -
## using the head state is slightly wrong - if a reorg deeper than
## EPOCHS_PER_SYNC_COMMITTEE_PERIOD is happening, some valid sync
## committee messages will be rejected
EpochKey* = object EpochKey* = object
## The epoch key fully determines the shuffling for proposers and ## The epoch key fully determines the shuffling for proposers and
## committees in a beacon state - the epoch level information in the state ## committees in a beacon state - the epoch level information in the state

View File

@ -151,7 +151,7 @@ func init*(
finalized_checkpoint: getStateField(state.data, finalized_checkpoint), finalized_checkpoint: getStateField(state.data, finalized_checkpoint),
shuffled_active_validator_indices: shuffled_active_validator_indices:
cache.get_shuffled_active_validator_indices(state.data, epoch) cache.get_shuffled_active_validator_indices(state.data, epoch)
) )
for i in 0'u64..<SLOTS_PER_EPOCH: for i in 0'u64..<SLOTS_PER_EPOCH:
epochRef.beacon_proposers[i] = get_beacon_proposer_index( epochRef.beacon_proposers[i] = get_beacon_proposer_index(
@ -548,6 +548,10 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# have a cache # have a cache
dag.updateValidatorKeys(getStateField(dag.headState.data, validators).asSeq()) dag.updateValidatorKeys(getStateField(dag.headState.data, validators).asSeq())
withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)
info "Block dag initialized", info "Block dag initialized",
head = shortLog(headRef), head = shortLog(headRef),
finalizedHead = shortLog(dag.finalizedHead), finalizedHead = shortLog(dag.finalizedHead),
@ -1037,8 +1041,8 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
dagPruneDur = Moment.now() - startTick dagPruneDur = Moment.now() - startTick
iterator syncSubcommittee*( iterator syncSubcommittee*(
syncCommittee: openArray[ValidatorPubKey], syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex): ValidatorPubKey = subcommitteeIdx: SyncSubcommitteeIndex): ValidatorIndex =
var var
i = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE i = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE
onePastEndIdx = min(syncCommittee.len, i + SYNC_SUBCOMMITTEE_SIZE) onePastEndIdx = min(syncCommittee.len, i + SYNC_SUBCOMMITTEE_SIZE)
@ -1060,7 +1064,7 @@ iterator syncSubcommitteePairs*(
inc i inc i
func syncCommitteeParticipants*(dag: ChainDAGRef, func syncCommitteeParticipants*(dag: ChainDAGRef,
slot: Slot): seq[ValidatorPubKey] = slot: Slot): seq[ValidatorIndex] =
withState(dag.headState.data): withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair: when stateFork >= BeaconStateFork.Altair:
let let
@ -1068,28 +1072,25 @@ func syncCommitteeParticipants*(dag: ChainDAGRef,
curPeriod = sync_committee_period(state.data.slot) curPeriod = sync_committee_period(state.data.slot)
if period == curPeriod: if period == curPeriod:
@(state.data.current_sync_committee.pubkeys.data) @(dag.headSyncCommittees.current_sync_committee)
elif period == curPeriod + 1: elif period == curPeriod + 1:
@(state.data.current_sync_committee.pubkeys.data) @(dag.headSyncCommittees.next_sync_committee)
else: @[] else: @[]
else: else:
@[] @[]
func getSubcommitteePositionsAux( func getSubcommitteePositionsAux(
dag: ChainDAGRef, dag: ChainDAGRef,
syncCommittee: openarray[ValidatorPubKey], syncCommittee: openArray[ValidatorIndex],
subcommitteeIdx: SyncSubcommitteeIndex, subcommitteeIdx: SyncSubcommitteeIndex,
validatorIdx: uint64): seq[uint64] = validatorIdx: uint64): seq[uint64] =
# TODO Can we avoid the key conversions by getting a compressed key
# out of ImmutableValidatorData2? If we had this, we can define
# the function `dag.validatorKeyBytes` and use it here.
let validatorKey = dag.validatorKey(validatorIdx) let validatorKey = dag.validatorKey(validatorIdx)
if validatorKey.isNone(): if validatorKey.isNone():
return @[] return @[]
let validatorPubKey = validatorKey.get().toPubKey let validatorPubKey = validatorKey.get().toPubKey
for pos, key in toSeq(syncCommittee.syncSubcommittee(subcommitteeIdx)): for pos, key in toSeq(syncCommittee.syncSubcommittee(subcommitteeIdx)):
if validatorPubKey == key: if validatorIdx == uint64(key):
result.add uint64(pos) result.add uint64(pos)
func getSubcommitteePositions*(dag: ChainDAGRef, func getSubcommitteePositions*(dag: ChainDAGRef,
@ -1102,14 +1103,14 @@ func getSubcommitteePositions*(dag: ChainDAGRef,
period = sync_committee_period(slot) period = sync_committee_period(slot)
curPeriod = sync_committee_period(state.data.slot) curPeriod = sync_committee_period(state.data.slot)
template search(syncCommittee: openarray[ValidatorPubKey]): seq[uint64] = template search(syncCommittee: openArray[ValidatorIndex]): seq[uint64] =
dag.getSubcommitteePositionsAux( dag.getSubcommitteePositionsAux(
syncCommittee, subcommitteeIdx, validatorIdx) syncCommittee, subcommitteeIdx, validatorIdx)
if period == curPeriod: if period == curPeriod:
search(state.data.current_sync_committee.pubkeys.data) search(dag.headSyncCommittees.current_sync_committee)
elif period == curPeriod + 1: elif period == curPeriod + 1:
search(state.data.current_sync_committee.pubkeys.data) search(dag.headSyncCommittees.next_sync_committee)
else: @[] else: @[]
else: else:
@[] @[]
@ -1117,16 +1118,16 @@ func getSubcommitteePositions*(dag: ChainDAGRef,
template syncCommitteeParticipants*( template syncCommitteeParticipants*(
dag: ChainDAGRef, dag: ChainDAGRef,
slot: Slot, slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorPubKey] = subcommitteeIdx: SyncSubcommitteeIndex): seq[ValidatorIndex] =
toSeq(syncSubcommittee(dag.syncCommitteeParticipants(slot), subcommitteeIdx)) toSeq(syncSubcommittee(dag.syncCommitteeParticipants(slot), subcommitteeIdx))
iterator syncCommitteeParticipants*( iterator syncCommitteeParticipants*(
dag: ChainDAGRef, dag: ChainDAGRef,
slot: Slot, slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex, subcommitteeIdx: SyncSubcommitteeIndex,
aggregationBits: SyncCommitteeAggregationBits): ValidatorPubKey = aggregationBits: SyncCommitteeAggregationBits): ValidatorIndex =
for pos, valIdx in pairs(dag.syncCommitteeParticipants(slot, subcommitteeIdx)): for pos, valIdx in pairs(dag.syncCommitteeParticipants(slot, subcommitteeIdx)):
if aggregationBits[pos]: if pos < aggregationBits.bits and aggregationBits[pos]:
yield valIdx yield valIdx
func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool = func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool =
@ -1207,6 +1208,10 @@ proc updateHead*(
dag.db.putHeadBlock(newHead.root) dag.db.putHeadBlock(newHead.root)
withState(dag.headState.data):
when stateFork >= BeaconStateFork.Altair:
dag.headSyncCommittees = state.data.get_sync_committee_cache(cache)
let let
finalizedHead = newHead.atEpochStart( finalizedHead = newHead.atEpochStart(
getStateField(dag.headState.data, finalized_checkpoint).epoch) getStateField(dag.headState.data, finalized_checkpoint).epoch)

View File

@ -785,7 +785,7 @@ proc validateSyncCommitteeMessage*(
ok((positionsInSubcommittee, cookedSignature.get())) ok((positionsInSubcommittee, cookedSignature.get()))
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof # https://github.com/ethereum/eth2.0-specs/blob/v1.1.5/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
proc validateSignedContributionAndProof*( proc validateSignedContributionAndProof*(
dag: ChainDAGRef, dag: ChainDAGRef,
syncCommitteeMsgPool: var SyncCommitteeMsgPool, syncCommitteeMsgPool: var SyncCommitteeMsgPool,
@ -855,16 +855,22 @@ proc validateSignedContributionAndProof*(
initialized = false initialized = false
syncCommitteeSlot = msg.message.contribution.slot + 1 syncCommitteeSlot = msg.message.contribution.slot + 1
for validatorPubKey in dag.syncCommitteeParticipants( for validatorIndex in dag.syncCommitteeParticipants(
syncCommitteeSlot, syncCommitteeSlot,
committeeIdx, committeeIdx,
msg.message.contribution.aggregation_bits): msg.message.contribution.aggregation_bits):
let validatorPubKey = validatorPubKey.loadWithCache.get let validatorPubKey = dag.validatorKey(validatorIndex)
if not validatorPubKey.isSome():
# This should never happen (!)
warn "Invalid validator index in committee cache",
validatorIndex
return errIgnore("SignedContributionAndProof: Invalid committee cache")
if not initialized: if not initialized:
initialized = true initialized = true
committeeAggKey.init(validatorPubKey) committeeAggKey.init(validatorPubKey.get())
else: else:
committeeAggKey.aggregate(validatorPubKey) committeeAggKey.aggregate(validatorPubKey.get())
if not initialized: if not initialized:
# [REJECT] The contribution has participants # [REJECT] The contribution has participants

View File

@ -237,7 +237,7 @@ proc initialize_beacon_state_from_eth1*(
deposits.len) deposits.len)
state.eth1_deposit_index = deposits.lenu64 state.eth1_deposit_index = deposits.lenu64
var pubkeyToIndex = initTable[ValidatorPubKey, int]() var pubkeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]()
for idx, deposit in deposits: for idx, deposit in deposits:
let let
pubkey = deposit.pubkey pubkey = deposit.pubkey
@ -249,7 +249,7 @@ proc initialize_beacon_state_from_eth1*(
do: do:
if skipBlsValidation in flags or if skipBlsValidation in flags or
verify_deposit_signature(cfg, deposit): verify_deposit_signature(cfg, deposit):
pubkeyToIndex[pubkey] = state.validators.len pubkeyToIndex[pubkey] = ValidatorIndex(state.validators.len)
if not state.validators.add(get_validator_from_deposit(deposit)): if not state.validators.add(get_validator_from_deposit(deposit)):
raiseAssert "too many validators" raiseAssert "too many validators"
if not state.balances.add(amount): if not state.balances.add(amount):
@ -707,6 +707,9 @@ func get_next_sync_committee_keys(state: altair.BeaconState | merge.BeaconState)
## Return the sequence of sync committee indices (which may include ## Return the sequence of sync committee indices (which may include
## duplicate indices) for the next sync committee, given a ``state`` at a ## duplicate indices) for the next sync committee, given a ``state`` at a
## sync committee period boundary. ## sync committee period boundary.
# The sync committe depends on seed and effective balance - it can
# thus only be computed for the current epoch of the state, after balance
# updates have been performed
let epoch = get_current_epoch(state) + 1 let epoch = get_current_epoch(state) + 1
@ -744,9 +747,9 @@ proc get_next_sync_committee*(state: altair.BeaconState | merge.BeaconState):
# see signatures_batch, TODO shouldn't be here # see signatures_batch, TODO shouldn't be here
# Deposit processing ensures all keys are valid # Deposit processing ensures all keys are valid
var attestersAgg: AggregatePublicKey var attestersAgg: AggregatePublicKey
attestersAgg.init(res.pubkeys.data[0].loadWithCache().get) attestersAgg.init(res.pubkeys.data[0].load().get)
for i in 1 ..< res.pubkeys.data.len: for i in 1 ..< res.pubkeys.data.len:
attestersAgg.aggregate(res.pubkeys.data[i].loadWithCache().get) attestersAgg.aggregate(res.pubkeys.data[i].load().get)
res.aggregate_pubkey = finish(attestersAgg).toPubKey() res.aggregate_pubkey = finish(attestersAgg).toPubKey()
res res
@ -922,3 +925,36 @@ func latest_block_root*(state: ForkyBeaconState, state_root: Eth2Digest): Eth2Di
func latest_block_root*(state: ForkyHashedBeaconState): Eth2Digest = func latest_block_root*(state: ForkyHashedBeaconState): Eth2Digest =
latest_block_root(state.data, state.root) latest_block_root(state.data, state.root)
func get_sync_committee_cache*(
state: altair.BeaconState | merge.BeaconState, cache: var StateCache):
SyncCommitteeCache =
let period = state.slot.sync_committee_period()
cache.sync_committees.withValue(period, v) do:
return v[]
var
s = toHashSet(state.current_sync_committee.pubkeys.data)
for pk in state.next_sync_committee.pubkeys.data:
s.incl(pk)
var pubkeyIndices: Table[ValidatorPubKey, ValidatorIndex]
for i, v in state.validators:
if v.pubkey in s:
pubkeyIndices[v.pubkey] = i.ValidatorIndex
var res: SyncCommitteeCache
try:
for i in 0..<res.current_sync_committee.len():
res.current_sync_committee[i] =
pubkeyIndices[state.current_sync_committee.pubkeys[i]]
res.next_sync_committee[i] =
pubkeyIndices[state.next_sync_committee.pubkeys[i]]
except KeyError:
raiseAssert "table constructed just above"
cache.sync_committees[period] = res
res

View File

@ -357,7 +357,9 @@ type
## - ProposerSlashing (SignedBeaconBlockHeader) ## - ProposerSlashing (SignedBeaconBlockHeader)
## - AttesterSlashing (IndexedAttestation) ## - AttesterSlashing (IndexedAttestation)
## - SignedVoluntaryExits ## - SignedVoluntaryExits
## - SyncAggregate
## ##
## However:
## - ETH1Data (Deposits) can contain invalid BLS signatures ## - ETH1Data (Deposits) can contain invalid BLS signatures
## ##
## The block state transition has NOT been verified ## The block state transition has NOT been verified
@ -373,7 +375,7 @@ type
voluntary_exits*: List[TrustedSignedVoluntaryExit, Limit MAX_VOLUNTARY_EXITS] voluntary_exits*: List[TrustedSignedVoluntaryExit, Limit MAX_VOLUNTARY_EXITS]
# [New in Altair] # [New in Altair]
sync_aggregate*: SyncAggregate # TODO TrustedSyncAggregate after batching sync_aggregate*: TrustedSyncAggregate
SyncnetBits* = BitArray[SYNC_COMMITTEE_SUBNET_COUNT] SyncnetBits* = BitArray[SYNC_COMMITTEE_SUBNET_COUNT]

View File

@ -380,12 +380,17 @@ type
message*: AggregateAndProof message*: AggregateAndProof
signature*: ValidatorSig signature*: ValidatorSig
SyncCommitteeCache* = object
current_sync_committee*: array[SYNC_COMMITTEE_SIZE, ValidatorIndex]
next_sync_committee*: array[SYNC_COMMITTEE_SIZE, ValidatorIndex]
# This doesn't know about forks or branches in the DAG. It's for straight, # This doesn't know about forks or branches in the DAG. It's for straight,
# linear chunks of the chain. # linear chunks of the chain.
StateCache* = object StateCache* = object
shuffled_active_validator_indices*: shuffled_active_validator_indices*:
Table[Epoch, seq[ValidatorIndex]] Table[Epoch, seq[ValidatorIndex]]
beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]] beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]]
sync_committees*: Table[SyncCommitteePeriod, SyncCommitteeCache]
# This matches the mutable state of the Solidity deposit contract # This matches the mutable state of the Solidity deposit contract
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/solidity_deposit_contract/deposit_contract.sol # https://github.com/ethereum/consensus-specs/blob/v1.1.2/solidity_deposit_contract/deposit_contract.sol

View File

@ -40,10 +40,7 @@ proc toJsonHex(data: openArray[byte]): string =
proc fromJson*(n: JsonNode, argName: string, result: var ValidatorPubKey) {.raises: [Defect, ValueError].} = proc fromJson*(n: JsonNode, argName: string, result: var ValidatorPubKey) {.raises: [Defect, ValueError].} =
n.kind.expect(JString, argName) n.kind.expect(JString, argName)
var tmp = ValidatorPubKey.fromHex(n.getStr()).tryGet() result = ValidatorPubKey.fromHex(n.getStr()).tryGet()
if not tmp.loadWithCache().isSome():
raise (ref ValueError)(msg: "Invalid public BLS key")
result = tmp
proc `%`*(pubkey: ValidatorPubKey): JsonNode = proc `%`*(pubkey: ValidatorPubKey): JsonNode =
newJString(toJsonHex(toRaw(pubkey))) newJString(toJsonHex(toRaw(pubkey)))

View File

@ -414,4 +414,48 @@ proc collectSignatureSets*(
volex.message.epoch, volex.message.epoch,
DOMAIN_VOLUNTARY_EXIT) DOMAIN_VOLUNTARY_EXIT)
block:
# 7. SyncAggregate
# ----------------------------------------------------
withState(state):
when stateFork >= BeaconStateFork.Altair and
(signed_block is altair.SignedBeaconBlock or
signed_block is merge.SignedBeaconBlock):
let
current_sync_committee =
state.data.get_sync_committee_cache(cache).current_sync_committee
var inited = false
var attestersAgg{.noInit.}: AggregatePublicKey
for i in 0 ..< current_sync_committee.len:
if signed_block.message.body.sync_aggregate.sync_committee_bits[i]:
let key = validatorKeys.load(current_sync_committee[i])
if not key.isSome():
return err("Invalid key cache")
if not inited: # first iteration
attestersAgg.init(key.get())
inited = true
else:
attestersAgg.aggregate(key.get())
if not inited:
if signed_block.message.body.sync_aggregate.sync_committee_signature !=
default(CookedSig).toValidatorSig():
return err("process_sync_aggregate: empty sync aggregates need signature of point at infinity")
else:
let
attesters = finish(attestersAgg)
previous_slot = max(state.data.slot, Slot(1)) - 1
sigs.addSignatureSet(
attesters,
get_block_root_at_slot(state.data, previous_slot),
signed_block.message.body.sync_aggregate.sync_committee_signature.loadOrExit(
"process_sync_aggregate: cannot load signature"),
state.data.fork,
state.data.genesis_validators_root,
previous_slot.epoch,
DOMAIN_SYNC_COMMITTEE)
ok() ok()

View File

@ -434,19 +434,18 @@ proc process_sync_aggregate*(
# Verify sync committee aggregate signature signing over the previous slot # Verify sync committee aggregate signature signing over the previous slot
# block root # block root
let let
committee_pubkeys = state.current_sync_committee.pubkeys
previous_slot = max(state.slot, Slot(1)) - 1 previous_slot = max(state.slot, Slot(1)) - 1
domain = get_domain(state, DOMAIN_SYNC_COMMITTEE, compute_epoch_at_slot(previous_slot)) domain = get_domain(state, DOMAIN_SYNC_COMMITTEE, compute_epoch_at_slot(previous_slot))
signing_root = compute_signing_root(get_block_root_at_slot(state, previous_slot), domain) signing_root = compute_signing_root(get_block_root_at_slot(state, previous_slot), domain)
when aggregate.sync_committee_signature isnot TrustedSig: when aggregate.sync_committee_signature isnot TrustedSig:
var participant_pubkeys: seq[ValidatorPubKey] var participant_pubkeys: seq[ValidatorPubKey]
for i in 0 ..< committee_pubkeys.len: for i in 0 ..< state.current_sync_committee.pubkeys.len:
if aggregate.sync_committee_bits[i]: if aggregate.sync_committee_bits[i]:
participant_pubkeys.add committee_pubkeys[i] participant_pubkeys.add state.current_sync_committee.pubkeys[i]
# p2p-interface message validators check for empty sync committees, so it # p2p-interface message validators check for empty sync committees, so it
# shouldn't run except as part of test suite. # shouldn't run except as part of test suite.
if participant_pubkeys.len == 0 and if participant_pubkeys.len == 0 and
aggregate.sync_committee_signature != default(CookedSig).toValidatorSig(): aggregate.sync_committee_signature != default(CookedSig).toValidatorSig():
return err("process_sync_aggregate: empty sync aggregates need signature of point at infinity") return err("process_sync_aggregate: empty sync aggregates need signature of point at infinity")
@ -474,22 +473,13 @@ proc process_sync_aggregate*(
return err("process_sync_aggregate: no proposer") return err("process_sync_aggregate: no proposer")
# Apply participant and proposer rewards # Apply participant and proposer rewards
let indices = get_sync_committee_cache(state, cache).current_sync_committee
# stand-in to be replaced
# TODO obviously not viable as written
# TODO also, this could use the pubkey -> index map that's been approached a couple places
let s = toHashSet(state.current_sync_committee.pubkeys.data) # TODO leaking abstraction
var pubkeyIndices: Table[ValidatorPubKey, ValidatorIndex]
for i, v in state.validators:
if v.pubkey in s:
pubkeyIndices[v.pubkey] = i.ValidatorIndex
# TODO could use a sequtils2 zipIt # TODO could use a sequtils2 zipIt
for i in 0 ..< min( for i in 0 ..< min(
state.current_sync_committee.pubkeys.len, state.current_sync_committee.pubkeys.len,
aggregate.sync_committee_bits.len): aggregate.sync_committee_bits.len):
let participant_index = let participant_index = indices[i]
pubkeyIndices.getOrDefault(state.current_sync_committee.pubkeys.data[i])
if aggregate.sync_committee_bits[i]: if aggregate.sync_committee_bits[i]:
increase_balance(state, participant_index, participant_reward) increase_balance(state, participant_index, participant_reward)
increase_balance(state, proposer_index.get, proposer_reward) increase_balance(state, proposer_index.get, proposer_reward)

View File

@ -125,7 +125,7 @@ proc getAttachedValidator*(node: BeaconNode,
proc getAttachedValidator*(node: BeaconNode, proc getAttachedValidator*(node: BeaconNode,
state_validators: auto, state_validators: auto,
idx: ValidatorIndex): AttachedValidator = idx: ValidatorIndex): AttachedValidator =
if idx < state_validators.len.ValidatorIndex: if uint64(idx) < state_validators.lenu64:
let validator = node.getAttachedValidator(state_validators[idx].pubkey) let validator = node.getAttachedValidator(state_validators[idx].pubkey)
if validator != nil and validator.index != some(idx): if validator != nil and validator.index != some(idx):
# Update index, in case the validator was activated! # Update index, in case the validator was activated!
@ -235,16 +235,16 @@ proc sendSyncCommitteeMessages*(node: BeaconNode,
let (keysCur, keysNxt) = let (keysCur, keysNxt) =
block: block:
var resCur: Table[ValidatorPubKey, int] var resCur: Table[uint64, int]
var resNxt: Table[ValidatorPubKey, int] var resNxt: Table[uint64, int]
for index, msg in msgs.pairs(): for index, msg in msgs.pairs():
if msg.validator_index < lenu64(state.data.validators): if msg.validator_index < lenu64(state.data.validators):
let msgPeriod = sync_committee_period(msg.slot) let msgPeriod = sync_committee_period(msg.slot)
if msgPeriod == curPeriod: if msgPeriod == curPeriod:
resCur[state.data.validators.asSeq[msg.validator_index].pubkey] = index resCur[msg.validator_index] = index
elif msgPeriod == nextPeriod: elif msgPeriod == nextPeriod:
resNxt[state.data.validators.asSeq[msg.validator_index].pubkey] = index resNxt[msg.validator_index] = index
else: else:
statuses[index] = statuses[index] =
some(SendResult.err("Message's slot out of state's head range")) some(SendResult.err("Message's slot out of state's head range"))
@ -259,16 +259,16 @@ proc sendSyncCommitteeMessages*(node: BeaconNode,
var resIndices: seq[int] var resIndices: seq[int]
for committeeIdx in allSyncSubcommittees(): for committeeIdx in allSyncSubcommittees():
for valKey in syncSubcommittee( for valKey in syncSubcommittee(
state.data.current_sync_committee.pubkeys.data, committeeIdx): node.dag.headSyncCommittees.current_sync_committee, committeeIdx):
let index = keysCur.getOrDefault(valKey, -1) let index = keysCur.getOrDefault(uint64(valKey), -1)
if index >= 0: if index >= 0:
resIndices.add(index) resIndices.add(index)
resFutures.add(node.sendSyncCommitteeMessage(msgs[index], resFutures.add(node.sendSyncCommitteeMessage(msgs[index],
committeeIdx, true)) committeeIdx, true))
for committeeIdx in allSyncSubcommittees(): for committeeIdx in allSyncSubcommittees():
for valKey in syncSubcommittee( for valKey in syncSubcommittee(
state.data.next_sync_committee.pubkeys.data, committeeIdx): node.dag.headSyncCommittees.next_sync_committee, committeeIdx):
let index = keysNxt.getOrDefault(valKey, -1) let index = keysNxt.getOrDefault(uint64(valKey), -1)
if index >= 0: if index >= 0:
resIndices.add(index) resIndices.add(index)
resFutures.add(node.sendSyncCommitteeMessage(msgs[index], resFutures.add(node.sendSyncCommitteeMessage(msgs[index],
@ -676,11 +676,12 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode,
proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) = proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
# TODO Use a view type to avoid the copy # TODO Use a view type to avoid the copy
var syncCommittee = @(node.dag.syncCommitteeParticipants(slot + 1)) var syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
for committeeIdx in allSyncSubcommittees(): for committeeIdx in allSyncSubcommittees():
for valKey in syncSubcommittee(syncCommittee, committeeIdx): for valIdx in syncSubcommittee(syncCommittee, committeeIdx):
let validator = node.getAttachedValidator(valKey) let validator = node.getAttachedValidator(
getStateField(node.dag.headState.data, validators), valIdx)
if isNil(validator) or validator.index.isNone(): if isNil(validator) or validator.index.isNone():
continue continue
asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator, asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator,
@ -715,7 +716,7 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
let let
fork = node.dag.forkAtEpoch(slot.epoch) fork = node.dag.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = node.dag.genesisValidatorsRoot genesisValidatorsRoot = node.dag.genesisValidatorsRoot
syncCommittee = @(node.dag.syncCommitteeParticipants(slot + 1)) syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
type type
AggregatorCandidate = object AggregatorCandidate = object
@ -729,8 +730,9 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
for subcommitteeIdx in allSyncSubcommittees(): for subcommitteeIdx in allSyncSubcommittees():
# TODO Hoist outside of the loop with a view type # TODO Hoist outside of the loop with a view type
# to avoid the repeated offset calculations # to avoid the repeated offset calculations
for valKey in syncSubcommittee(syncCommittee, subcommitteeIdx): for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getAttachedValidator(valKey) let validator = node.getAttachedValidator(
getStateField(node.dag.headState.data, validators), valIdx)
if validator == nil: if validator == nil:
continue continue

View File

@ -69,7 +69,6 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
genesisTime = float getStateField(genesisState[], genesis_time) genesisTime = float getStateField(genesisState[], genesis_time)
var var
validatorKeyToIndex = initTable[ValidatorPubKey, int]()
cfg = defaultRuntimeConfig cfg = defaultRuntimeConfig
cfg.ALTAIR_FORK_EPOCH = 64.Slot.epoch cfg.ALTAIR_FORK_EPOCH = 64.Slot.epoch
@ -83,10 +82,6 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
ChainDAGRef.preInit(db, genesisState[], genesisState[], genesisBlock) ChainDAGRef.preInit(db, genesisState[], genesisState[], genesisBlock)
putInitialDepositContractSnapshot(db, depositContractSnapshot) putInitialDepositContractSnapshot(db, depositContractSnapshot)
withState(genesisState[]):
for i in 0 ..< state.data.validators.len:
validatorKeyToIndex[state.data.validators[i].pubkey] = i
var var
dag = ChainDAGRef.init(cfg, db, {}) dag = ChainDAGRef.init(cfg, db, {})
eth1Chain = Eth1Chain.init(cfg, db) eth1Chain = Eth1Chain.init(cfg, db)
@ -144,7 +139,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
type type
Aggregator = object Aggregator = object
subcommitteeIdx: SyncSubcommitteeIndex subcommitteeIdx: SyncSubcommitteeIndex
validatorIdx: int validatorIdx: ValidatorIndex
selectionProof: ValidatorSig selectionProof: ValidatorSig
let let
@ -159,13 +154,12 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
var aggregators: seq[Aggregator] var aggregators: seq[Aggregator]
for subcommitteeIdx in allSyncSubcommittees(): for subcommitteeIdx in allSyncSubcommittees():
for valKey in syncSubcommittee(syncCommittee, subcommitteeIdx): for validatorIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
if rand(r, 1.0) > syncCommitteeRatio: if rand(r, 1.0) > syncCommitteeRatio:
continue continue
let let
validatorIdx = validatorKeyToIndex[valKey] validarorPrivKey = MockPrivKeys[validatorIdx]
validarorPrivKey = MockPrivKeys[validatorIdx.ValidatorIndex]
signature = blsSign(validarorPrivKey, signingRoot.data) signature = blsSign(validarorPrivKey, signingRoot.data)
msg = SyncCommitteeMessage( msg = SyncCommitteeMessage(
slot: slot, slot: slot,
@ -390,7 +384,6 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
for i in 0 ..< newDeposits: for i in 0 ..< newDeposits:
let validatorIdx = merkleizer.getChunkCount.int let validatorIdx = merkleizer.getChunkCount.int
let d = makeDeposit(validatorIdx, {skipBLSValidation}) let d = makeDeposit(validatorIdx, {skipBLSValidation})
validatorKeyToIndex[d.pubkey] = validatorIdx
eth1Block.deposits.add d eth1Block.deposits.add d
merkleizer.addChunk hash_tree_root(d).data merkleizer.addChunk hash_tree_root(d).data

View File

@ -14,7 +14,7 @@ import
stew/assign2, stew/assign2,
eth/keys, taskpools, eth/keys, taskpools,
../beacon_chain/spec/datatypes/base, ../beacon_chain/spec/datatypes/base,
../beacon_chain/spec/[beaconstate, forks, helpers, state_transition], ../beacon_chain/spec/[beaconstate, forks, helpers, signatures, state_transition],
../beacon_chain/[beacon_chain_db], ../beacon_chain/[beacon_chain_db],
../beacon_chain/consensus_object_pools/[ ../beacon_chain/consensus_object_pools/[
attestation_pool, blockchain_dag, block_quarantine, block_clearance], attestation_pool, blockchain_dag, block_quarantine, block_clearance],
@ -23,6 +23,10 @@ import
func `$`(x: BlockRef): string = func `$`(x: BlockRef): string =
$x.root $x.root
const
nilPhase0Callback = OnPhase0BlockAdded(nil)
nilAltairCallback = OnAltairBlockAdded(nil)
proc pruneAtFinalization(dag: ChainDAGRef) = proc pruneAtFinalization(dag: ChainDAGRef) =
if dag.needStateCachesAndForkChoicePruning(): if dag.needStateCachesAndForkChoicePruning():
dag.pruneStateCachesDAG() dag.pruneStateCachesDAG()
@ -121,7 +125,6 @@ suite "Block pool processing" & preset():
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
taskpool = Taskpool.new() taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool) quarantine = QuarantineRef.init(keys.newRng(), taskpool)
nilPhase0Callback: OnPhase0BlockAdded
state = newClone(dag.headState.data) state = newClone(dag.headState.data)
cache = StateCache() cache = StateCache()
info = ForkedEpochInfo() info = ForkedEpochInfo()
@ -340,7 +343,84 @@ suite "Block pool processing" & preset():
tmpState.blck == b1Add[].parent tmpState.blck == b1Add[].parent
getStateField(tmpState.data, slot) == bs1.parent.slot getStateField(tmpState.data, slot) == bs1.parent.slot
const nilPhase0Callback = OnPhase0BlockAdded(nil) suite "Block pool altair processing" & preset():
setup:
var
cfg = defaultRuntimeConfig
cfg.ALTAIR_FORK_EPOCH = Epoch(1)
var
db = makeTestDB(SLOTS_PER_EPOCH)
dag = init(ChainDAGRef, cfg, db, {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
nilAltairCallback: OnAltairBlockAdded
state = newClone(dag.headState.data)
cache = StateCache()
info = ForkedEpochInfo()
# Advance to altair
check:
process_slots(
cfg, state[], cfg.ALTAIR_FORK_EPOCH.compute_start_slot_at_epoch(), cache,
info, {})
state[].kind == BeaconStateFork.Altair
var
b1 = addTestBlock(state[], cache).altairData
att1 = makeFullAttestations(state[], b1.root, b1.message.slot, cache)
b2 = addTestBlock(state[], cache, attestations = att1).altairData
test "Invalid signatures" & preset():
let badSignature = get_slot_signature(
Fork(), Eth2Digest(), 42.Slot,
MockPrivKeys[ValidatorIndex(0)]).toValidatorSig()
check:
dag.addRawBlock(quarantine, b1, nilAltairCallback).isOk()
block: # Main signature
var b = b2
b.signature = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
block: # Randao reveal
var b = b2
b.message.body.randao_reveal = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
block: # Attestations
var b = b2
b.message.body.attestations[0].signature = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
block: # SyncAggregate empty
var b = b2
b.message.body.sync_aggregate.sync_committee_signature = badSignature
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
block: # SyncAggregate junk
var b = b2
b.message.body.sync_aggregate.sync_committee_signature = badSignature
b.message.body.sync_aggregate.sync_committee_bits[0] = true
let
bAdd = dag.addRawBlock(quarantine, b, nilAltairCallback)
check:
bAdd.error() == (ValidationResult.Reject, Invalid)
suite "chain DAG finalization tests" & preset(): suite "chain DAG finalization tests" & preset():
setup: setup:
@ -349,7 +429,6 @@ suite "chain DAG finalization tests" & preset():
dag = init(ChainDAGRef, defaultRuntimeConfig, db, {}) dag = init(ChainDAGRef, defaultRuntimeConfig, db, {})
taskpool = Taskpool.new() taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool) quarantine = QuarantineRef.init(keys.newRng(), taskpool)
nilPhase0Callback: OnPhase0BlockAdded
cache = StateCache() cache = StateCache()
info = ForkedEpochInfo() info = ForkedEpochInfo()
@ -586,7 +665,6 @@ suite "Diverging hardforks":
dag = init(ChainDAGRef, phase0RuntimeConfig, db, {}) dag = init(ChainDAGRef, phase0RuntimeConfig, db, {})
taskpool = Taskpool.new() taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool) quarantine = QuarantineRef.init(keys.newRng(), taskpool)
nilPhase0Callback: OnPhase0BlockAdded
state = newClone(dag.headState.data) state = newClone(dag.headState.data)
cache = StateCache() cache = StateCache()
info = ForkedEpochInfo() info = ForkedEpochInfo()

View File

@ -69,10 +69,7 @@ suite "Gossip validation " & preset():
committeeLen(10000) == 0 committeeLen(10000) == 0
committeeLen(uint64.high) == 0 committeeLen(uint64.high) == 0
test "Validation sanity": test "validateAttestation":
# TODO: refactor tests to avoid skipping BLS validation
dag.updateFlags.incl {skipBLSValidation}
var var
cache: StateCache cache: StateCache
for blck in makeTestBlocks( for blck in makeTestBlocks(
@ -210,11 +207,9 @@ suite "Gossip validation - Extra": # Not based on preset config
subcommitteeIdx = 0.SyncSubcommitteeIndex subcommitteeIdx = 0.SyncSubcommitteeIndex
syncCommittee = @(dag.syncCommitteeParticipants(slot)) syncCommittee = @(dag.syncCommitteeParticipants(slot))
subcommittee = toSeq(syncCommittee.syncSubcommittee(subcommitteeIdx)) subcommittee = toSeq(syncCommittee.syncSubcommittee(subcommitteeIdx))
index = subcommittee[0]
pubkey = subcommittee[0] expectedCount = subcommittee.count(index)
expectedCount = subcommittee.count(pubkey) pubkey = state[].data.validators[index].pubkey
index = ValidatorIndex(
state[].data.validators.mapIt(it.pubkey).find(pubKey))
privateItem = ValidatorPrivateItem(privateKey: MockPrivKeys[index]) privateItem = ValidatorPrivateItem(privateKey: MockPrivKeys[index])
validator = AttachedValidator(pubKey: pubkey, validator = AttachedValidator(pubKey: pubkey,
kind: ValidatorKind.Local, data: privateItem, index: some(index)) kind: ValidatorKind.Local, data: privateItem, index: some(index))