sync committee message pool and gossip validation (#2830)

This commit is contained in:
tersec 2021-08-28 10:40:01 +00:00 committed by GitHub
parent eeba2869fc
commit 166e22a43b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 467 additions and 1 deletions

View File

@ -977,6 +977,105 @@ proc pruneBlocksDAG(dag: ChainDAGRef) =
prunedHeads = hlen - dag.heads.len,
dagPruneDur = Moment.now() - startTick
func syncSubcommittee*(syncCommittee: openarray[ValidatorPubKey],
committeeIdx: SyncCommitteeIndex): seq[ValidatorPubKey] =
## TODO Return a view type
## Unfortunately, this doesn't work as a template right now.
if syncCommittee.len == 0:
return @[]
let
startIdx = committeeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE
onePastEndIdx = startIdx + SYNC_SUBCOMMITTEE_SIZE
doAssert startIdx < syncCommittee.len
@(toOpenArray(syncCommittee, startIdx, onePastEndIdx - 1))
func syncCommitteeParticipants*(dagParam: ChainDAGRef,
slotParam: Slot): seq[ValidatorPubKey] =
# TODO:
# Use view types in Nim 1.6
# Right now, the compiler is not able to handle turning this into a
# template and returning an openarray
let
dag = dagParam
slot = slotParam
if dag.headState.data.beaconStateFork == forkAltair:
let
headSlot = dag.headState.data.hbsAltair.data.slot
headCommitteePeriod = syncCommitteePeriod(headSlot)
periodStart = syncCommitteePeriodStartSlot(headCommitteePeriod)
nextPeriodStart = periodStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD
if slot >= nextPeriodStart:
@(dag.headState.data.hbsAltair.data.next_sync_committee.pubkeys.data)
elif slot >= periodStart:
@(dag.headState.data.hbsAltair.data.current_sync_committee.pubkeys.data)
else:
@[]
else:
@[]
func getSubcommitteePositionAux*(
dag: ChainDAGRef,
syncCommittee: openarray[ValidatorPubKey],
committeeIdx: SyncCommitteeIndex,
validatorIdx: uint64): Option[uint64] =
# TODO Can we avoid the key conversions by getting a compressed key
# out of ImmutableValidatorData2? If we had this, we can define
# the function `dag.validatorKeyBytes` and use it here.
let validatorKey = dag.validatorKey(validatorIdx)
if validatorKey.isNone():
return
let validatorPubKey = validatorKey.get().toPubKey
for pos, key in syncCommittee.syncSubcommittee(committeeIdx):
if validatorPubKey == key:
return some uint64(pos)
func getSubcommitteePosition*(dag: ChainDAGRef,
slot: Slot,
committeeIdx: SyncCommitteeIndex,
validatorIdx: uint64): Option[uint64] =
if dag.headState.data.beaconStateFork == forkPhase0:
return
let
headSlot = dag.headState.data.hbsAltair.data.slot
headCommitteePeriod = syncCommitteePeriod(headSlot)
periodStart = syncCommitteePeriodStartSlot(headCommitteePeriod)
nextPeriodStart = periodStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD
template search(syncCommittee: openarray[ValidatorPubKey]): Option[uint64] =
dag.getSubcommitteePositionAux(syncCommittee, committeeIdx, validatorIdx)
if slot < periodStart:
return
elif slot >= nextPeriodStart:
return search(dag.headState.data.hbsAltair.data.next_sync_committee.pubkeys.data)
else:
return search(dag.headState.data.hbsAltair.data.current_sync_committee.pubkeys.data)
template syncCommitteeParticipants*(
dag: ChainDAGRef,
slot: Slot,
committeeIdx: SyncCommitteeIndex): seq[ValidatorPubKey] =
let
startIdx = committeeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE
onePastEndIdx = startIdx + SYNC_SUBCOMMITTEE_SIZE
# TODO Nim is not happy with returning an openarray here
@(toOpenArray(dag.syncCommitteeParticipants(slot), startIdx, onePastEndIdx - 1))
iterator syncCommitteeParticipants*(
dag: ChainDAGRef,
slot: Slot,
committeeIdx: SyncCommitteeIndex,
aggregationBits: SyncCommitteeAggregationBits): ValidatorPubKey =
for pos, valIdx in pairs(dag.syncCommitteeParticipants(slot, committeeIdx)):
if aggregationBits[pos]:
yield valIdx
func needStateCachesAndForkChoicePruning*(dag: ChainDAGRef): bool =
dag.lastPrunePoint != dag.finalizedHead

View File

@ -0,0 +1,164 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/sets,
chronicles,
../spec/digest,
../spec/datatypes/altair,
../beacon_node_types,
./block_pools_types
func init*(T: type SyncCommitteeMsgPool): SyncCommitteeMsgPool =
discard
func init(T: type SyncAggregate): SyncAggregate =
SyncAggregate(sync_committee_signature: ValidatorSig.infinity)
func clearPerSlotData*(pool: var SyncCommitteeMsgPool) =
clear pool.seenAggregateByAuthor
clear pool.seenByAuthor
# TODO The previously implemened pruning has proven to be too
# aggressive. We can consider a scheme where the data is pruned
# with several slots of delay to allow for late sync committee
# messages.
# clear pool.bestAggregates
# clear pool.blockVotes
func addSyncCommitteeMsg*(
pool: var SyncCommitteeMsgPool,
slot: Slot,
beaconBlockRoot: Eth2Digest,
signature: CookedSig,
committeeIdx: SyncCommitteeIndex,
positionInCommittee: uint64) =
pool.blockVotes.mgetOrPut(beaconBlockRoot, @[]).add TrustedSyncCommitteeMsg(
slot: slot,
committeeIdx: committeeIdx,
positionInCommittee: positionInCommittee,
signature: signature)
func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg],
committeeIdx: SyncCommitteeIndex,
contribution: var SyncCommitteeContribution): bool =
var
aggregateSig {.noInit.}: AggregateSignature
initialized = false
for vote in votes:
if vote.committeeIdx != committeeIdx:
continue
if not initialized:
initialized = true
aggregateSig.init(vote.signature)
else:
aggregateSig.aggregate(vote.signature)
contribution.aggregation_bits.setBit vote.positionInCommittee
if initialized:
contribution.signature = aggregateSig.finish.toValidatorSig
return initialized
func produceContribution*(
pool: SyncCommitteeMsgPool,
slot: Slot,
head: BlockRef,
committeeIdx: SyncCommitteeIndex,
outContribution: var SyncCommitteeContribution): bool =
if head.root in pool.blockVotes:
outContribution.slot = slot
outContribution.beacon_block_root = head.root
outContribution.subcommittee_index = committeeIdx.asUInt64
try:
return computeAggregateSig(pool.blockVotes[head.root],
committeeIdx,
outContribution)
except KeyError:
raiseAssert "We have checked for the key upfront"
else:
return false
func addAggregateAux(bestVotes: var BestSyncSubcommitteeContributions,
contribution: SyncCommitteeContribution) =
let totalParticipants = countOnes(contribution.aggregation_bits)
if totalParticipants > bestVotes[contribution.subcommittee_index].totalParticipants:
bestVotes[contribution.subcommittee_index] =
BestSyncSubcommitteeContribution(
totalParticipants: totalParticipants,
participationBits: contribution.aggregation_bits,
signature: contribution.signature.load.get)
func addSyncContribution*(
pool: var SyncCommitteeMsgPool,
contribution: SyncCommitteeContribution,
signature: CookedSig) =
template blockRoot: auto = contribution.beacon_block_root
if blockRoot notin pool.bestAggregates:
var bestContributions: BestSyncSubcommitteeContributions
let totalParticipants = countOnes(contribution.aggregation_bits)
bestContributions[contribution.subcommittee_index] =
BestSyncSubcommitteeContribution(
totalParticipants: totalParticipants,
participationBits: contribution.aggregation_bits,
signature: signature)
pool.bestAggregates[blockRoot] = bestContributions
else:
try:
addAggregateAux(pool.bestAggregates[blockRoot], contribution)
except KeyError:
raiseAssert "We have checked for the key upfront"
proc produceSyncAggregateAux(votes: BestSyncSubcommitteeContributions): SyncAggregate =
var
aggregateSig {.noInit.}: AggregateSignature
initialized = false
startTime = Moment.now
for subnetId in 0 ..< SYNC_COMMITTEE_SUBNET_COUNT:
if votes[subnetId].totalParticipants == 0:
continue
for pos, value in votes[subnetId].participationBits:
if value:
let globalPos = subnetId * SYNC_SUBCOMMITTEE_SIZE + pos
result.sync_committee_bits.setBit globalPos
if not initialized:
initialized = true
aggregateSig.init(votes[subnetId].signature)
else:
aggregateSig.aggregate(votes[subnetId].signature)
if initialized:
result.sync_committee_signature = aggregateSig.finish.toValidatorSig
else:
result.sync_committee_signature = ValidatorSig.infinity
let duration = Moment.now - startTime
debug "SyncAggregate produced", duration,
bits = result.sync_committee_bits
proc produceSyncAggregate*(
pool: SyncCommitteeMsgPool,
target: BlockRef): SyncAggregate =
if target.root in pool.bestAggregates:
try:
produceSyncAggregateAux(pool.bestAggregates[target.root])
except KeyError:
raiseAssert "We have checked for the key upfront"
else:
SyncAggregate.init()

View File

@ -18,7 +18,8 @@ import
../spec/[
beaconstate, state_transition_block, forks, helpers, network, signatures],
../consensus_object_pools/[
attestation_pool, blockchain_dag, block_quarantine, exit_pool, spec_cache],
attestation_pool, blockchain_dag, block_quarantine, exit_pool, spec_cache,
sync_committee_msg_pool],
".."/[beacon_node_types, beacon_clock],
./batch_validation
@ -748,3 +749,205 @@ proc validateVoluntaryExit*(
signed_voluntary_exit, VOLUNTARY_EXITS_BOUND)
ok()
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#sync_committee_subnet_id
proc validateSyncCommitteeMessage*(
dag: ChainDAGRef,
syncCommitteeMsgPool: SyncCommitteeMsgPoolRef,
msg: SyncCommitteeMessage,
syncCommitteeIdx: SyncCommitteeIndex,
wallTime: BeaconTime,
checkSignature: bool):
Result[void, (ValidationResult, cstring)] =
block:
# [IGNORE] The signature's slot is for the current slot
# (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
# i.e. sync_committee_message.slot == current_slot.
let res = check_propagation_slot_range(msg.slot, wallTime)
if res.isErr:
return res
# [REJECT] The subnet_id is valid for the given validator
# i.e. subnet_id in compute_subnets_for_sync_committee(state, sync_committee_message.validator_index).
# Note this validation implies the validator is part of the broader
# current sync committee along with the correct subcommittee.
# This check also ensures that the validator index is in range
let positionInSubcommittee = dag.getSubcommitteePosition(
msg.slot + 1, syncCommitteeIdx, msg.validator_index)
if positionInSubcommittee.isNone:
return errReject(
"SyncCommitteeMessage: originator not part of sync committee")
block:
# [IGNORE] There has been no other valid sync committee signature for the
# declared slot for the validator referenced by sync_committee_message.validator_index
# (this requires maintaining a cache of size SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT
# for each subnet that can be flushed after each slot).
#
# Note this validation is per topic so that for a given slot, multiple
# messages could be forwarded with the same validator_index as long as
# the subnet_ids are distinct.
let msgKey = SyncCommitteeMsgKey(
originator: msg.validator_index.ValidatorIndex,
slot: msg.slot,
committeeIdx: syncCommitteeIdx)
if msgKey in syncCommitteeMsgPool.seenByAuthor:
return errReject("SyncCommitteeMessage: duplicate message")
else:
syncCommitteeMsgPool.seenByAuthor.incl msgKey
block:
# [REJECT] The signature is valid for the message beacon_block_root for the
# validator referenced by validator_index.
let
epoch = msg.slot.epoch
fork = dag.forkAtEpoch(epoch)
genesisValidatorsRoot = dag.genesisValidatorsRoot
senderPubKey = dag.validatorKey(msg.validator_index)
if senderPubKey.isNone():
return errReject("SyncCommitteeMessage: invalid validator index")
var cookedSignature = msg.signature.load
if cookedSignature.isNone:
return errReject("SyncCommitteeMessage: signature fails to load")
if checkSignature and
not verify_sync_committee_message_signature(epoch,
msg.beacon_block_root,
fork, genesisValidatorsRoot,
senderPubKey.get(),
cookedSignature.get):
return errReject("SyncCommitteeMessage: signature fails to verify")
syncCommitteeMsgPool[].addSyncCommitteeMsg(
msg.slot,
msg.beacon_block_root,
cookedSignature.get,
syncCommitteeIdx,
positionInSubcommittee.get)
ok()
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#sync_committee_contribution_and_proof
proc validateSignedContributionAndProof*(
dag: ChainDAGRef,
syncCommitteeMsgPool: SyncCommitteeMsgPoolRef,
msg: SignedContributionAndProof,
wallTime: BeaconTime,
checkSignature: bool):
Result[void, (ValidationResult, cstring)] =
# [IGNORE] The contribution's slot is for the current slot
# (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance)
# i.e. contribution.slot == current_slot.
? check_propagation_slot_range(msg.message.contribution.slot, wallTime)
let
aggregatorPubKey = dag.validatorKey(msg.message.aggregator_index)
if aggregatorPubKey.isNone():
return errReject("SignedContributionAndProof: invalid aggregator index")
# [REJECT] The subcommittee index is in the allowed range
# i.e. contribution.subcommittee_index < SYNC_COMMITTEE_SUBNET_COUNT.
let committeeIdx = msg.message.contribution.subcommittee_index.validateSyncCommitteeIndexOr:
return errReject("SignedContributionAndProof: subcommittee index too high")
# [REJECT] contribution_and_proof.selection_proof selects the validator as an aggregator for the slot
# i.e. is_sync_committee_aggregator(contribution_and_proof.selection_proof) returns True.
if not is_sync_committee_aggregator(msg.message.selection_proof):
return err((ValidationResult.Reject, cstring(
"SignedContributionAndProof: invalid selection_proof")))
block:
# [IGNORE] The sync committee contribution is the first valid contribution
# received for the aggregator with index contribution_and_proof.aggregator_index
# for the slot contribution.slot and subcommittee index contribution.subcommittee_index
# (this requires maintaining a cache of size SYNC_COMMITTEE_SIZE for this
# topic that can be flushed after each slot).
let msgKey = SyncCommitteeMsgKey(
originator: msg.message.aggregator_index.ValidatorIndex,
slot: msg.message.contribution.slot,
committeeIdx: committeeIdx)
if msgKey in syncCommitteeMsgPool.seenAggregateByAuthor:
return errIgnore("SignedContributionAndProof: duplicate aggregation")
syncCommitteeMsgPool.seenAggregateByAuthor.incl msgKey
block:
# [REJECT] The aggregator's validator index is in the declared subcommittee
# of the current sync committee.
# i.e. state.validators[contribution_and_proof.aggregator_index].pubkey in
# get_sync_subcommittee_pubkeys(state, contribution.subcommittee_index).
let
epoch = msg.message.contribution.slot.epoch
fork = dag.forkAtEpoch(epoch)
genesisValidatorsRoot = dag.genesisValidatorsRoot
# [REJECT] The aggregator signature, signed_contribution_and_proof.signature, is valid
if not verify_signed_contribution_and_proof_signature(msg, fork,
genesisValidatorsRoot,
aggregatorPubKey.get()):
return errReject(
"SignedContributionAndProof: aggregator signature fails to verify")
# [REJECT] The contribution_and_proof.selection_proof is a valid signature of the
# SyncAggregatorSelectionData derived from the contribution by the validator with
# index contribution_and_proof.aggregator_index.
if not verify_selection_proof_signature(msg.message, fork,
genesisValidatorsRoot,
aggregatorPubKey.get()):
return errReject(
"SignedContributionAndProof: selection proof signature fails to verify")
# [REJECT] The aggregate signature is valid for the message beacon_block_root
# and aggregate pubkey derived from the participation info in aggregation_bits
# for the subcommittee specified by the contribution.subcommittee_index.
var
committeeAggKey {.noInit.}: AggregatePublicKey
initialized = false
mixedKeys = 0
for validatorPubKey in dag.syncCommitteeParticipants(
msg.message.contribution.slot + 1,
committeeIdx,
msg.message.contribution.aggregation_bits):
let validatorPubKey = validatorPubKey.loadWithCache.get
if not initialized:
initialized = true
committeeAggKey.init(validatorPubKey)
inc mixedKeys
else:
inc mixedKeys
committeeAggKey.aggregate(validatorPubKey)
if not initialized:
# [REJECT] The contribution has participants
# that is, any(contribution.aggregation_bits).
return errReject("SignedContributionAndProof: aggregation bits empty")
let cookedSignature = msg.message.contribution.signature.load
if cookedSignature.isNone:
return errReject(
"SignedContributionAndProof: aggregate signature fails to load")
if checkSignature and
not verify_sync_committee_message_signature(
epoch, msg.message.contribution.beacon_block_root, fork,
genesisValidatorsRoot, committeeAggKey.finish, cookedSignature.get):
debug "failing_sync_contribution",
slot = msg.message.contribution.slot + 1,
subnet = committeeIdx,
participants = $(msg.message.contribution.aggregation_bits),
mixedKeys
return errReject(
"SignedContributionAndProof: aggregate signature fails to verify")
syncCommitteeMsgPool[].addSyncContribution(
msg.message.contribution,
cookedSignature.get)
ok()