Restore the sync committee pool pruning and add tests

This commit is contained in:
Zahary Karadjov 2021-08-30 04:00:37 +03:00 committed by zah
parent 3689c68cbf
commit 7d1efa443d
9 changed files with 318 additions and 54 deletions

View File

@ -215,6 +215,14 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
+ roundtrip OK
```
OK: 2/2 Fail: 0/2 Skip: 0/2
## Sync committee pool
```diff
+ Aggregating votes OK
+ An empty pool is safe to prune OK
+ An empty pool is safe to prune 2 OK
+ An empty pool is safe to use OK
```
OK: 4/4 Fail: 0/4 Skip: 0/4
## SyncManager test suite
```diff
+ [SyncQueue] Async pending and resetWait() test OK
@ -340,4 +348,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 36/48 Fail: 0/48 Skip: 12/48
---TOTAL---
OK: 186/198 Fail: 0/198 Skip: 12/198
OK: 190/202 Fail: 0/202 Skip: 12/202

View File

@ -91,14 +91,16 @@ type
participationBits*: SyncCommitteeAggregationBits
signature*: CookedSig
BestSyncSubcommitteeContributions* = array[SYNC_COMMITTEE_SUBNET_COUNT,
BestSyncSubcommitteeContribution]
BestSyncSubcommitteeContributions* = object
slot*: Slot
subnets*: array[SYNC_COMMITTEE_SUBNET_COUNT,
BestSyncSubcommitteeContribution]
SyncCommitteeMsgPool* = object
seenByAuthor*: HashSet[SyncCommitteeMsgKey]
seenAggregateByAuthor*: HashSet[SyncCommitteeMsgKey]
blockVotes*: Table[Eth2Digest, seq[TrustedSyncCommitteeMsg]]
bestAggregates*: Table[Eth2Digest, BestSyncSubcommitteeContributions]
seenSyncMsgByAuthor*: HashSet[SyncCommitteeMsgKey]
seenContributionByAuthor*: HashSet[SyncCommitteeMsgKey]
syncMessages*: Table[Eth2Digest, seq[TrustedSyncCommitteeMsg]]
bestContributions*: Table[Eth2Digest, BestSyncSubcommitteeContributions]
SyncCommitteeMsgPoolRef* = ref SyncCommitteeMsgPool
@ -115,10 +117,10 @@ type
voluntary_exits*: Deque[SignedVoluntaryExit] ## \
## Not a function of chain DAG branch; just used as a FIFO queue for blocks
prior_seen_attester_slashed_indices*: IntSet ##\
prior_seen_attester_slashed_indices*: IntSet ## \
## Records attester-slashed indices seen.
prior_seen_proposer_slashed_indices*: IntSet ##\
prior_seen_proposer_slashed_indices*: IntSet ## \
## Records proposer-slashed indices seen.
prior_seen_voluntary_exit_indices*: IntSet ##\

View File

@ -15,30 +15,60 @@ import
../beacon_node_types,
./block_pools_types
export
BestSyncSubcommitteeContributions,
Slot,
SyncCommitteeContribution,
SyncCommitteeIndex,
SyncCommitteeMsgPool,
SyncAggregate,
TrustedSyncCommitteeMsg
const
syncCommitteeMsgsRetentionSlots = 3
## How many slots to retain sync committee
## messsages before discarding them.
func init*(T: type SyncCommitteeMsgPool): SyncCommitteeMsgPool =
discard
func init(T: type SyncAggregate): SyncAggregate =
SyncAggregate(sync_committee_signature: ValidatorSig.infinity)
func clearPerSlotData*(pool: var SyncCommitteeMsgPool) =
clear pool.seenAggregateByAuthor
clear pool.seenByAuthor
# TODO The previously implemened pruning has proven to be too
# aggressive. We can consider a scheme where the data is pruned
# with several slots of delay to allow for late sync committee
# messages.
# clear pool.bestAggregates
# clear pool.blockVotes
func pruneData*(pool: var SyncCommitteeMsgPool, slot: Slot) =
## This should be called at the end of slot.
clear pool.seenContributionByAuthor
clear pool.seenSyncMsgByAuthor
if slot < syncCommitteeMsgsRetentionSlots:
return
let minSlotToRetain = slot - syncCommitteeMsgsRetentionSlots
var syncMsgsToDelete: seq[Eth2Digest]
var contributionsToDelete: seq[Eth2Digest]
for blockRoot, msgs in pool.syncMessages:
if msgs[0].slot < minSlotToRetain:
syncMsgsToDelete.add blockRoot
for blockRoot in syncMsgsToDelete:
pool.syncMessages.del blockRoot
for blockRoot, bestContributions in pool.bestContributions:
if bestContributions.slot < minSlotToRetain:
contributionsToDelete.add blockRoot
for blockRoot in contributionsToDelete:
pool.bestContributions.del blockRoot
func addSyncCommitteeMsg*(
pool: var SyncCommitteeMsgPool,
slot: Slot,
beaconBlockRoot: Eth2Digest,
blockRoot: Eth2Digest,
signature: CookedSig,
committeeIdx: SyncCommitteeIndex,
positionInCommittee: uint64) =
pool.blockVotes.mgetOrPut(beaconBlockRoot, @[]).add TrustedSyncCommitteeMsg(
pool.syncMessages.mgetOrPut(blockRoot, @[]).add TrustedSyncCommitteeMsg(
slot: slot,
committeeIdx: committeeIdx,
positionInCommittee: positionInCommittee,
@ -71,15 +101,15 @@ func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg],
func produceContribution*(
pool: SyncCommitteeMsgPool,
slot: Slot,
head: BlockRef,
headRoot: Eth2Digest,
committeeIdx: SyncCommitteeIndex,
outContribution: var SyncCommitteeContribution): bool =
if head.root in pool.blockVotes:
if headRoot in pool.syncMessages:
outContribution.slot = slot
outContribution.beacon_block_root = head.root
outContribution.beacon_block_root = headRoot
outContribution.subcommittee_index = committeeIdx.asUInt64
try:
return computeAggregateSig(pool.blockVotes[head.root],
return computeAggregateSig(pool.syncMessages[headRoot],
committeeIdx,
outContribution)
except KeyError:
@ -89,11 +119,15 @@ func produceContribution*(
func addAggregateAux(bestVotes: var BestSyncSubcommitteeContributions,
contribution: SyncCommitteeContribution) =
let totalParticipants = countOnes(contribution.aggregation_bits)
if totalParticipants > bestVotes[contribution.subcommittee_index].totalParticipants:
bestVotes[contribution.subcommittee_index] =
let
currentBestTotalParticipants =
bestVotes.subnets[contribution.subcommittee_index].totalParticipants
newBestTotalParticipants = countOnes(contribution.aggregation_bits)
if newBestTotalParticipants > currentBestTotalParticipants:
bestVotes.subnets[contribution.subcommittee_index] =
BestSyncSubcommitteeContribution(
totalParticipants: totalParticipants,
totalParticipants: newBestTotalParticipants,
participationBits: contribution.aggregation_bits,
signature: contribution.signature.load.get)
@ -104,44 +138,45 @@ func addSyncContribution*(
template blockRoot: auto = contribution.beacon_block_root
if blockRoot notin pool.bestAggregates:
var bestContributions: BestSyncSubcommitteeContributions
if blockRoot notin pool.bestContributions:
let totalParticipants = countOnes(contribution.aggregation_bits)
var initialBestContributions = BestSyncSubcommitteeContributions(
slot: contribution.slot)
bestContributions[contribution.subcommittee_index] =
initialBestContributions.subnets[contribution.subcommittee_index] =
BestSyncSubcommitteeContribution(
totalParticipants: totalParticipants,
participationBits: contribution.aggregation_bits,
signature: signature)
pool.bestAggregates[blockRoot] = bestContributions
pool.bestContributions[blockRoot] = initialBestContributions
else:
try:
addAggregateAux(pool.bestAggregates[blockRoot], contribution)
addAggregateAux(pool.bestContributions[blockRoot], contribution)
except KeyError:
raiseAssert "We have checked for the key upfront"
proc produceSyncAggregateAux(votes: BestSyncSubcommitteeContributions): SyncAggregate =
proc produceSyncAggregateAux(
bestContributions: BestSyncSubcommitteeContributions): SyncAggregate =
var
aggregateSig {.noInit.}: AggregateSignature
initialized = false
startTime = Moment.now
for subnetId in 0 ..< SYNC_COMMITTEE_SUBNET_COUNT:
if votes[subnetId].totalParticipants == 0:
if bestContributions.subnets[subnetId].totalParticipants == 0:
continue
for pos, value in votes[subnetId].participationBits:
for pos, value in bestContributions.subnets[subnetId].participationBits:
if value:
let globalPos = subnetId * SYNC_SUBCOMMITTEE_SIZE + pos
result.sync_committee_bits.setBit globalPos
if not initialized:
initialized = true
aggregateSig.init(votes[subnetId].signature)
aggregateSig.init(bestContributions.subnets[subnetId].signature)
else:
aggregateSig.aggregate(votes[subnetId].signature)
aggregateSig.aggregate(bestContributions.subnets[subnetId].signature)
if initialized:
result.sync_committee_signature = aggregateSig.finish.toValidatorSig
@ -154,10 +189,10 @@ proc produceSyncAggregateAux(votes: BestSyncSubcommitteeContributions): SyncAggr
proc produceSyncAggregate*(
pool: SyncCommitteeMsgPool,
target: BlockRef): SyncAggregate =
if target.root in pool.bestAggregates:
targetRoot: Eth2Digest): SyncAggregate =
if targetRoot in pool.bestContributions:
try:
produceSyncAggregateAux(pool.bestAggregates[target.root])
produceSyncAggregateAux(pool.bestContributions[targetRoot])
except KeyError:
raiseAssert "We have checked for the key upfront"
else:

View File

@ -793,10 +793,10 @@ proc validateSyncCommitteeMessage*(
slot: msg.slot,
committeeIdx: syncCommitteeIdx)
if msgKey in syncCommitteeMsgPool.seenByAuthor:
if msgKey in syncCommitteeMsgPool.seenSyncMsgByAuthor:
return errReject("SyncCommitteeMessage: duplicate message")
else:
syncCommitteeMsgPool.seenByAuthor.incl msgKey
syncCommitteeMsgPool.seenSyncMsgByAuthor.incl msgKey
block:
# [REJECT] The signature is valid for the message beacon_block_root for the
@ -871,10 +871,10 @@ proc validateSignedContributionAndProof*(
slot: msg.message.contribution.slot,
committeeIdx: committeeIdx)
if msgKey in syncCommitteeMsgPool.seenAggregateByAuthor:
return errIgnore("SignedContributionAndProof: duplicate aggregation")
if msgKey in syncCommitteeMsgPool.seenContributionByAuthor:
return errIgnore("SignedContributionAndProof: duplicate contribution")
syncCommitteeMsgPool.seenAggregateByAuthor.incl msgKey
syncCommitteeMsgPool.seenContributionByAuthor.incl msgKey
block:
# [REJECT] The aggregator's validator index is in the declared subcommittee

View File

@ -1009,7 +1009,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# the database are synced with the filesystem.
node.db.checkpoint()
node.syncCommitteeMsgPool[].clearPerSlotData()
node.syncCommitteeMsgPool[].pruneData(slot)
# -1 is a more useful output than 18446744073709551615 as an indicator of
# no future attestation/proposal known.

View File

@ -381,7 +381,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
node.exitPool[].getProposerSlashingsForBlock(),
node.exitPool[].getAttesterSlashingsForBlock(),
node.exitPool[].getVoluntaryExitsForBlock(),
node.sync_committee_msg_pool[].produceSyncAggregate(head),
node.sync_committee_msg_pool[].produceSyncAggregate(head.root),
default(ExecutionPayload),
restore,
cache).map(proc (t: auto): auto = ForkedBeaconBlock.init(t))
@ -709,7 +709,7 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
var contribution: SyncCommitteeContribution
let contributionWasProduced = node.syncCommitteeMsgPool[].produceContribution(
slot, head, candidateAggregators[i].committeeIdx, contribution)
slot, head.root, candidateAggregators[i].committeeIdx, contribution)
if contributionWasProduced:
asyncSpawn signAndSendContribution(
@ -723,7 +723,8 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
debug "Failure to produce contribution",
slot, head, subnet = candidateAggregators[i].committeeIdx
notice "Contributions sent", count = contributionsSent, time
if contributionsSent > 0:
notice "Contributions sent", count = contributionsSent, time
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =

View File

@ -196,7 +196,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
for aggregator in aggregators:
var contribution: SyncCommitteeContribution
let contributionWasProduced = syncCommitteePool[].produceContribution(
slot, dag.head, aggregator.committeeIdx, contribution)
slot, dag.head.root, aggregator.committeeIdx, contribution)
if contributionWasProduced:
let
@ -285,7 +285,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
@[],
@[],
@[],
syncCommitteePool[].produceSyncAggregate(dag.head),
syncCommitteePool[].produceSyncAggregate(dag.head.root),
ExecutionPayload(),
noRollback,
cache)
@ -402,7 +402,7 @@ cli do(slots = SLOTS_PER_EPOCH * 6,
withTimer(timers[tSyncCommittees]):
handleSyncCommitteeActions(slot)
syncCommitteePool[].clearPerSlotData()
syncCommitteePool[].pruneData(slot)
# TODO if attestation pool was smarter, it would include older attestations
# too!

View File

@ -26,6 +26,7 @@ import # Unit test
./test_message_signatures,
./test_peer_pool,
./test_statediff,
./test_sync_committee_pool,
./test_sync_manager,
./test_zero_signature,
./fork_choice/tests_fork_choice,

View File

@ -0,0 +1,217 @@
import
unittest2,
../beacon_chain/ssz/bitseqs,
../beacon_chain/spec/[beaconstate, crypto, digest, helpers, presets, signatures],
../beacon_chain/consensus_object_pools/sync_committee_msg_pool,
testblockutil
func aggregate(sigs: openarray[CookedSig]): CookedSig =
var agg {.noInit.}: AggregateSignature
agg.init sigs[0]
for i in 1 ..< sigs.len:
agg.aggregate sigs[i]
agg.finish
suite "Sync committee pool":
setup:
var pool = SyncCommitteeMsgPool.init()
test "An empty pool is safe to use":
let headRoot = eth2digest(@[1.byte, 2, 3])
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
Slot(1),
headRoot,
SyncCommitteeIndex(0),
outContribution)
check(success == false)
let aggregate = pool.produceSyncAggregate(headRoot)
check:
aggregate.sync_committee_bits.isZeros
aggregate.sync_committee_signature == ValidatorSig.infinity
test "An empty pool is safe to prune":
pool.pruneData(Slot(0))
test "An empty pool is safe to prune 2":
pool.pruneData(Slot(10000))
test "Aggregating votes":
let
fork = altairFork(defaultRuntimeConfig)
genesisValidatorsRoot = eth2digest(@[5.byte, 6, 7])
privkey1 = makeFakeValidatorPrivKey(1)
privkey2 = makeFakeValidatorPrivKey(2)
privkey3 = makeFakeValidatorPrivKey(3)
privkey4 = makeFakeValidatorPrivKey(4)
root1 = eth2digest(@[1.byte])
root2 = eth2digest(@[1.byte, 2])
root3 = eth2digest(@[1.byte, 2, 3])
root1Slot = Slot(100)
root2Slot = Slot(101)
root3Slot = Slot(101)
subcommittee1 = SyncCommitteeIndex(0)
subcommittee2 = SyncCommitteeIndex(1)
sig1 = blsSign(privkey1, sync_committee_msg_signing_root(
fork, root1Slot.epoch, genesisValidatorsRoot, root1).data)
sig2 = blsSign(privkey2, sync_committee_msg_signing_root(
fork, root2Slot.epoch, genesisValidatorsRoot, root1).data)
sig3 = blsSign(privkey3, sync_committee_msg_signing_root(
fork, root3Slot.epoch, genesisValidatorsRoot, root1).data)
sig4 = blsSign(privkey4, sync_committee_msg_signing_root(
fork, root3Slot.epoch, genesisValidatorsRoot, root2).data)
# Inserting sync committee messages
#
pool.addSyncCommitteeMsg(root1Slot, root1, sig1, subcommittee1, 1)
pool.addSyncCommitteeMsg(root1Slot, root1, sig2, subcommittee1, 10)
pool.addSyncCommitteeMsg(root2Slot, root1, sig3, subcommittee2, 7)
pool.addSyncCommitteeMsg(root2Slot, root2, sig4, subcommittee2, 3)
# Producing contributions
#
block:
# Checking a committee where there was no activity:
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
root2Slot,
root2,
subcommittee1,
outContribution)
check:
not success
block:
# Checking a committee where 2 signatures should have been aggregated:
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
root1Slot,
root1,
subcommittee1,
outContribution)
let expectedSig = aggregate [sig1, sig2]
check:
success
outContribution.slot == root1Slot
outContribution.beacon_block_root == root1
outContribution.subcommittee_index == subcommittee1.uint64
outContribution.aggregation_bits.countOnes == 2
outContribution.aggregation_bits[1] == true
outContribution.aggregation_bits[8] == false
outContribution.aggregation_bits[10] == true
outContribution.signature == expectedSig.toValidatorSig
pool.addSyncContribution(outContribution, expectedSig)
block:
# Checking a committee with a signle participant:
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
root1Slot,
root1,
subcommittee2,
outContribution)
check:
success
outContribution.slot == root1Slot
outContribution.beacon_block_root == root1
outContribution.subcommittee_index == subcommittee2.uint64
outContribution.aggregation_bits.countOnes == 1
outContribution.aggregation_bits[7] == true
outContribution.signature == sig3.toValidatorSig
pool.addSyncContribution(outContribution, sig3)
block:
# Checking another committee with a signle participant
# voting for a different block:
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
root2Slot,
root2,
subcommittee2,
outContribution)
check:
success
outContribution.slot == root2Slot
outContribution.beacon_block_root == root2
outContribution.subcommittee_index == subcommittee2.uint64
outContribution.aggregation_bits.countOnes == 1
outContribution.aggregation_bits[3] == true
outContribution.signature == sig4.toValidatorSig
pool.addSyncContribution(outContribution, sig4)
block:
# Checking a block root nobody voted for
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
root3Slot,
root3,
subcommittee2,
outContribution)
check:
not success
# Obtaining a SyncAggregate
#
block:
# Checking for a block that got no votes
let aggregate = pool.produceSyncAggregate(root3)
check:
aggregate.sync_committee_bits.isZeros
aggregate.sync_committee_signature == ValidatorSig.infinity
block:
# Checking for a block that got votes from 1 committee
let aggregate = pool.produceSyncAggregate(root2)
check:
aggregate.sync_committee_bits.countOnes == 1
aggregate.sync_committee_signature == sig4.toValidatorSig
block:
# Checking for a block that got votes from 2 committees
let aggregate = pool.produceSyncAggregate(root1)
let expectedSig = aggregate [sig1, sig2, sig3]
check:
aggregate.sync_committee_bits.countOnes == 3
aggregate.sync_committee_signature == expectedSig.toValidatorSig
# Pruning the data
#
pool.pruneData(Slot(200))
block:
# After pruning, all votes are gone
var outContribution: SyncCommitteeContribution
let success = pool.produceContribution(
root1Slot,
root1,
subcommittee1,
outContribution)
check:
not success
let aggregate = pool.produceSyncAggregate(root2)
check:
aggregate.sync_committee_bits.isZeros
aggregate.sync_committee_signature == ValidatorSig.infinity