handle duplicate pubkeys in sync committee (#2902)

When sync committee message handling was introduced in #2830, the edge
case of the same validator being selected multiple times as part of a
sync subcommittee was not covered. Not handling that edge case makes
sync contributions have a lower-than-expected participation rate as each
sync validator is only counted up through once per subcommittee.
This patch ensures that this edge case is properly covered.
This commit is contained in:
Etan Kissling 2021-09-28 09:44:20 +02:00 committed by GitHub
parent f2df006ee9
commit 01a9b275ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 23 deletions

View File

@ -108,6 +108,11 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
+ Validation sanity OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## Gossip validation - Extra
```diff
+ validateSyncCommitteeMessage OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Honest validator
```diff
+ General pubsub topics OK
@ -354,4 +359,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 36/48 Fail: 0/48 Skip: 12/48
---TOTAL---
OK: 192/204 Fail: 0/204 Skip: 12/204
OK: 193/205 Fail: 0/205 Skip: 12/205

View File

@ -1054,29 +1054,29 @@ func syncCommitteeParticipants*(dagParam: ChainDAGRef,
else:
@[]
func getSubcommitteePositionAux*(
func getSubcommitteePositionsAux(
dag: ChainDAGRef,
syncCommittee: openarray[ValidatorPubKey],
committeeIdx: SyncCommitteeIndex,
validatorIdx: uint64): Option[uint64] =
validatorIdx: uint64): seq[uint64] =
# TODO Can we avoid the key conversions by getting a compressed key
# out of ImmutableValidatorData2? If we had this, we can define
# the function `dag.validatorKeyBytes` and use it here.
let validatorKey = dag.validatorKey(validatorIdx)
if validatorKey.isNone():
return
return @[]
let validatorPubKey = validatorKey.get().toPubKey
for pos, key in syncCommittee.syncSubcommittee(committeeIdx):
if validatorPubKey == key:
return some uint64(pos)
result.add uint64(pos)
func getSubcommitteePosition*(dag: ChainDAGRef,
slot: Slot,
committeeIdx: SyncCommitteeIndex,
validatorIdx: uint64): Option[uint64] =
func getSubcommitteePositions*(dag: ChainDAGRef,
slot: Slot,
committeeIdx: SyncCommitteeIndex,
validatorIdx: uint64): seq[uint64] =
if dag.headState.data.beaconStateFork == forkPhase0:
return
return @[]
let
headSlot = dag.headState.data.hbsAltair.data.slot
@ -1084,11 +1084,11 @@ func getSubcommitteePosition*(dag: ChainDAGRef,
periodStart = syncCommitteePeriodStartSlot(headCommitteePeriod)
nextPeriodStart = periodStart + SLOTS_PER_SYNC_COMMITTEE_PERIOD
template search(syncCommittee: openarray[ValidatorPubKey]): Option[uint64] =
dag.getSubcommitteePositionAux(syncCommittee, committeeIdx, validatorIdx)
template search(syncCommittee: openarray[ValidatorPubKey]): seq[uint64] =
dag.getSubcommitteePositionsAux(syncCommittee, committeeIdx, validatorIdx)
if slot < periodStart:
return
return @[]
elif slot >= nextPeriodStart:
return search(dag.headState.data.hbsAltair.data.next_sync_committee.pubkeys.data)
else:

View File

@ -774,10 +774,10 @@ proc validateSyncCommitteeMessage*(
# Note this validation implies the validator is part of the broader
# current sync committee along with the correct subcommittee.
# This check also ensures that the validator index is in range
let positionInSubcommittee = dag.getSubcommitteePosition(
let positionsInSubcommittee = dag.getSubcommitteePositions(
msg.slot + 1, syncCommitteeIdx, msg.validator_index)
if positionInSubcommittee.isNone:
if positionsInSubcommittee.len == 0:
return errReject(
"SyncCommitteeMessage: originator not part of sync committee")
@ -824,12 +824,13 @@ proc validateSyncCommitteeMessage*(
cookedSignature.get):
return errReject("SyncCommitteeMessage: signature fails to verify")
syncCommitteeMsgPool[].addSyncCommitteeMsg(
msg.slot,
msg.beacon_block_root,
cookedSignature.get,
syncCommitteeIdx,
positionInSubcommittee.get)
for positionInSubcommittee in positionsInSubcommittee:
syncCommitteeMsgPool[].addSyncCommitteeMsg(
msg.slot,
msg.beacon_block_root,
cookedSignature.get,
syncCommitteeIdx,
positionInSubcommittee)
ok()

View File

@ -8,6 +8,8 @@
{.used.}
import
# Standard library
std/sequtils,
# Status lib
unittest2,
chronicles, chronos,
@ -17,9 +19,11 @@ import
../beacon_chain/gossip_processing/[gossip_validation, batch_validation],
../beacon_chain/fork_choice/[fork_choice_types, fork_choice],
../beacon_chain/consensus_object_pools/[
block_quarantine, blockchain_dag, block_clearance, attestation_pool],
../beacon_chain/spec/datatypes/phase0,
block_quarantine, blockchain_dag, block_clearance, attestation_pool,
sync_committee_msg_pool],
../beacon_chain/spec/datatypes/[phase0, altair],
../beacon_chain/spec/[forks, state_transition, helpers, network],
../beacon_chain/validators/validator_pool,
# Test utilities
./testutil, ./testdbutil, ./testblockutil
@ -170,3 +174,71 @@ suite "Gossip validation " & preset():
check:
fut_1_0.waitFor().error()[0] == ValidationResult.Reject
fut_1_1.waitFor().isOk()
suite "Gossip validation - Extra": # Not based on preset config
test "validateSyncCommitteeMessage":
const num_validators = SLOTS_PER_EPOCH
let
cfg = block:
var cfg = defaultRuntimeConfig
cfg.ALTAIR_FORK_EPOCH = (GENESIS_EPOCH + 1).Epoch
cfg
dag = block:
let
dag = ChainDAGRef.init(cfg, makeTestDB(num_validators), {})
taskpool = Taskpool.new()
quarantine = QuarantineRef.init(keys.newRng(), taskpool)
var cache = StateCache()
for blck in makeTestBlocks(
dag.headState.data, dag.head.root, cache,
int(SLOTS_PER_EPOCH), false, cfg = cfg):
let added =
case blck.kind
of BeaconBlockFork.Phase0:
const nilCallback = OnPhase0BlockAdded(nil)
dag.addRawBlock(quarantine, blck.phase0Block, nilCallback)
of BeaconBlockFork.Altair:
const nilCallback = OnAltairBlockAdded(nil)
dag.addRawBlock(quarantine, blck.altairBlock, nilCallback)
of BeaconBlockFork.Merge:
const nilCallback = OnMergeBlockAdded(nil)
dag.addRawBlock(quarantine, blck.mergeBlock, nilCallback)
check: added.isOk()
dag.updateHead(added[], quarantine)
dag
state = newClone(dag.headState.data.hbsAltair)
syncCommitteeIdx = 0.SyncCommitteeIndex
syncCommittee = @(dag.syncCommitteeParticipants(state[].data.slot))
subcommittee = syncCommittee.syncSubcommittee(syncCommitteeIdx)
pubkey = subcommittee[0]
expectedCount = subcommittee.count(pubkey)
index = ValidatorIndex(
state[].data.validators.mapIt(it.pubkey).find(pubKey))
validator = AttachedValidator(
pubKey: pubkey,
kind: inProcess, privKey: hackPrivKey(state[].data.validators[index]),
index: some(index))
msg = waitFor signSyncCommitteeMessage(
validator, state[].data.slot,
state[].data.fork, state[].data.genesis_validators_root, state[].root)
syncCommitteeMsgPool = newClone(SyncCommitteeMsgPool.init())
res = validateSyncCommitteeMessage(
dag, syncCommitteeMsgPool, msg, syncCommitteeIdx,
state[].data.slot.toBeaconTime(), true)
contribution = block:
var contribution: SyncCommitteeContribution
check: syncCommitteeMsgPool[].produceContribution(
state[].data.slot, state[].root, syncCommitteeIdx, contribution)
syncCommitteeMsgPool[].addSyncContribution(
contribution, contribution.signature.load.get)
contribution
aggregate = syncCommitteeMsgPool[].produceSyncAggregate(state[].root)
check:
expectedCount > 1 # Cover edge case
res.isOk
contribution.aggregation_bits.countOnes == expectedCount
aggregate.sync_committee_bits.countOnes == expectedCount