Improved electra attestations packing (#6498)

* test disjoint comittee bits

* test for aggregated attestations with disjoint bits

* improved attestation pool on-chain collecting and packing

* addressed format issues

* improved sorting method

* updated AllTests-mainnet file

* updated AllTests-mainnet file

* review corrections

* review corrections

* removed old phase0 artifacts

---------

Co-authored-by: Pedro Miranda <pedro.miranda@nimbus.team>
This commit is contained in:
Pedro Miranda 2024-08-23 12:26:35 +01:00 committed by GitHub
parent 8c2e8f2b0f
commit f9e44b2a3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 142 additions and 44 deletions

View File

@ -7,9 +7,11 @@ AllTests-mainnet
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1
## Attestation pool electra processing [Preset: mainnet] ## Attestation pool electra processing [Preset: mainnet]
```diff ```diff
+ Aggregated attestations with disjoint comittee bits into a single on-chain aggregate [Pres OK
+ Attestations with disjoint comittee bits and equal data into single on-chain aggregate [Pr OK
+ Can add and retrieve simple electra attestations [Preset: mainnet] OK + Can add and retrieve simple electra attestations [Preset: mainnet] OK
``` ```
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 3/3 Fail: 0/3 Skip: 0/3
## Attestation pool processing [Preset: mainnet] ## Attestation pool processing [Preset: mainnet]
```diff ```diff
+ Attestation from different branch [Preset: mainnet] OK + Attestation from different branch [Preset: mainnet] OK
@ -1038,4 +1040,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9 OK: 9/9 Fail: 0/9 Skip: 0/9
---TOTAL--- ---TOTAL---
OK: 691/696 Fail: 0/696 Skip: 5/696 OK: 693/698 Fail: 0/698 Skip: 5/698

View File

@ -8,6 +8,7 @@
{.push raises: [].} {.push raises: [].}
import import
std/algorithm,
# Status libraries # Status libraries
metrics, metrics,
chronicles, stew/byteutils, chronicles, stew/byteutils,
@ -873,6 +874,9 @@ proc getElectraAttestationsForBlock*(
# remain valid # remain valid
candidates.add((score, slot, addr entry, j)) candidates.add((score, slot, addr entry, j))
# Sort candidates by score use slot as a tie-breaker
candidates.sort()
# Using a greedy algorithm, select as many attestations as possible that will # Using a greedy algorithm, select as many attestations as possible that will
# fit in the block. # fit in the block.
# #
@ -887,34 +891,31 @@ proc getElectraAttestationsForBlock*(
# For each round, we'll look for the best attestation and add it to the result # For each round, we'll look for the best attestation and add it to the result
# then re-score the other candidates. # then re-score the other candidates.
var var
prevEpoch = state.data.get_previous_epoch() candidatesPerBlock: Table[(Eth2Digest, Slot), seq[electra.Attestation]]
prevEpochSpace = MAX_ATTESTATIONS_ELECTRA
var res: seq[electra.Attestation]
let totalCandidates = candidates.len() let totalCandidates = candidates.len()
while candidates.len > 0 and res.lenu64() < while candidates.len > 0 and candidatesPerBlock.lenu64() <
MAX_ATTESTATIONS_ELECTRA * MAX_COMMITTEES_PER_SLOT: MAX_ATTESTATIONS_ELECTRA * MAX_COMMITTEES_PER_SLOT:
let entryCacheKey = block: let entryCacheKey = block:
# Find the candidate with the highest score - slot is used as a let (_, _, entry, j) =
# tie-breaker so that more recent attestations are added first # Fast path for when all remaining candidates fit
if candidates.lenu64 < MAX_ATTESTATIONS_ELECTRA:
candidates[candidates.len - 1]
else:
# Get the candidate with the highest score
candidates.pop()
#TODO: Merge candidates per block structure with the candidates one
# and score possible on-chain attestations while collecting candidates
# (previous loop) and reavaluate cache key definition
let let
candidate = key = (entry.data.beacon_block_root, entry.data.slot)
# Fast path for when all remaining candidates fit newAtt = entry[].toElectraAttestation(entry[].aggregates[j])
if candidates.lenu64 < MAX_ATTESTATIONS_ELECTRA * MAX_COMMITTEES_PER_SLOT:
candidates.len - 1
else:
maxIndex(candidates)
(_, _, entry, j) = candidates[candidate]
candidates.del(candidate) # careful, `del` reorders candidates candidatesPerBlock.withValue(key, candidate):
candidate[].add newAtt
if entry[].data.target.epoch == prevEpoch: do:
if prevEpochSpace < 1: candidatesPerBlock[key] = @[newAtt]
continue # No need to rescore since we didn't add the attestation
prevEpochSpace -= 1
res.add(entry[].toElectraAttestation(entry[].aggregates[j]))
# Update cache so that the new votes are taken into account when updating # Update cache so that the new votes are taken into account when updating
# the score below # the score below
@ -938,35 +939,35 @@ proc getElectraAttestationsForBlock*(
# Only keep candidates that might add coverage # Only keep candidates that might add coverage
it.score > 0 it.score > 0
# TODO sort candidates by score - or really, rewrite the whole loop above ;) # Sort candidates by score use slot as a tie-breaker
var res2: seq[electra.Attestation] candidates.sort()
var perBlock: Table[(Eth2Digest, Slot), seq[electra.Attestation]]
for a in res: # Consolidate attestation aggregates with disjoint comittee bits into single
let key = (a.data.beacon_block_root, a.data.slot) # attestation
perBlock.mGetOrPut(key, newSeq[electra.Attestation](0)).add(a) var res: seq[electra.Attestation]
for a in candidatesPerBlock.values():
for a in perBlock.values(): if a.len > 1:
# TODO this will create on-chain aggregates that contain only one let
# committee index - this is obviously wrong but fixing requires att = compute_on_chain_aggregate(a).valueOr:
# a more significant rewrite - we should combine the best aggregates continue
# for each beacon block root res.add(att)
let x = compute_on_chain_aggregate(a).valueOr: #no on chain candidates
continue else:
res.add(a)
res2.add(x) if res.lenu64 == MAX_ATTESTATIONS_ELECTRA:
if res2.lenu64 == MAX_ATTESTATIONS_ELECTRA:
break break
let let
packingDur = Moment.now() - startPackingTick packingDur = Moment.now() - startPackingTick
debug "Packed attestations for block", debug "Packed attestations for block",
newBlockSlot, packingDur, totalCandidates, attestations = res2.len() newBlockSlot, packingDur, totalCandidates, attestations = res.len()
attestation_pool_block_attestation_packing_time.set( attestation_pool_block_attestation_packing_time.set(
packingDur.toFloatSeconds()) packingDur.toFloatSeconds())
res2 res
proc getElectraAttestationsForBlock*( proc getElectraAttestationsForBlock*(
pool: var AttestationPool, state: ForkedHashedBeaconState, pool: var AttestationPool, state: ForkedHashedBeaconState,

View File

@ -738,14 +738,15 @@ suite "Attestation pool electra processing" & preset():
## mock data. ## mock data.
setup: setup:
# Genesis state that results in 6 members per committee # Genesis state that results in 6 members per committee (2 committees total)
const TOTAL_COMMITTEES = 2
let rng = HmacDrbgContext.new() let rng = HmacDrbgContext.new()
var var
validatorMonitor = newClone(ValidatorMonitor.init()) validatorMonitor = newClone(ValidatorMonitor.init())
cfg = genesisTestRuntimeConfig(ConsensusFork.Electra) cfg = genesisTestRuntimeConfig(ConsensusFork.Electra)
dag = init( dag = init(
ChainDAGRef, cfg, ChainDAGRef, cfg,
makeTestDB(SLOTS_PER_EPOCH * 6, cfg = cfg), makeTestDB(TOTAL_COMMITTEES * TARGET_COMMITTEE_SIZE*SLOTS_PER_EPOCH * 6, cfg = cfg),
validatorMonitor, {}) validatorMonitor, {})
taskpool = Taskpool.new() taskpool = Taskpool.new()
verifier = BatchVerifier.init(rng, taskpool) verifier = BatchVerifier.init(rng, taskpool)
@ -757,7 +758,11 @@ suite "Attestation pool electra processing" & preset():
# Slot 0 is a finalized slot - won't be making attestations for it.. # Slot 0 is a finalized slot - won't be making attestations for it..
check: check:
process_slots( process_slots(
dag.cfg, state[], getStateField(state[], slot) + 1, cache, info, dag.cfg,
state[],
getStateField(state[], slot) + MIN_ATTESTATION_INCLUSION_DELAY,
cache,
info,
{}).isOk() {}).isOk()
@ -855,3 +860,93 @@ suite "Attestation pool electra processing" & preset():
pool[].addAttestation( pool[].addAttestation(
att4, @[bc1[2]], att3.loadSig, att3.data.slot.start_beacon_time) att4, @[bc1[2]], att3.loadSig, att3.data.slot.start_beacon_time)
test "Attestations with disjoint comittee bits and equal data into single on-chain aggregate" & preset():
let
bc0 = get_beacon_committee(
state[], getStateField(state[], slot), 0.CommitteeIndex, cache)
bc1 = get_beacon_committee(
state[], getStateField(state[], slot), 1.CommitteeIndex, cache)
# atestation from committee 1
attestation_1 = makeElectraAttestation(
state[], state[].latest_block_root, bc0[0], cache)
# atestation from different committee with same data as
# attestaton 1
attestation_2 = makeElectraAttestation(
state[], state[].latest_block_root, bc1[1], cache)
pool[].addAttestation(
attestation_1, @[bc0[0]], attestation_1.loadSig,
attestation_1.data.slot.start_beacon_time)
pool[].addAttestation(
attestation_2, @[bc0[1]], attestation_2.loadSig,
attestation_2.data.slot.start_beacon_time)
check:
process_slots(
defaultRuntimeConfig, state[],
getStateField(state[], slot) + MIN_ATTESTATION_INCLUSION_DELAY, cache,
info, {}).isOk()
let attestations = pool[].getElectraAttestationsForBlock(state[], cache)
check:
# A single inal chain aggregated attestation should be created
# with same data and joint committee,aggregation bits
attestations.len == 1
attestations[0].aggregation_bits.countOnes() == 2
attestations[0].committee_bits.countOnes() == 2
test "Aggregated attestations with disjoint comittee bits into a single on-chain aggregate" & preset():
let
bc0 = get_beacon_committee(
state[], getStateField(state[], slot), 0.CommitteeIndex, cache)
bc1 = get_beacon_committee(
state[], getStateField(state[], slot), 1.CommitteeIndex, cache)
# atestation from first committee
attestation_1 = makeElectraAttestation(
state[], state[].latest_block_root, bc0[0], cache)
# another attestation from first committee with same data
attestation_2 = makeElectraAttestation(
state[], state[].latest_block_root, bc0[1], cache)
# atestation from different committee with same data as
# attestaton 1
attestation_3 = makeElectraAttestation(
state[], state[].latest_block_root, bc1[1], cache)
pool[].addAttestation(
attestation_1, @[bc0[0]], attestation_1.loadSig,
attestation_1.data.slot.start_beacon_time)
pool[].addAttestation(
attestation_2, @[bc0[1]], attestation_2.loadSig,
attestation_2.data.slot.start_beacon_time)
pool[].addAttestation(
attestation_3, @[bc1[1]], attestation_3.loadSig,
attestation_3.data.slot.start_beacon_time)
check:
process_slots(
defaultRuntimeConfig, state[],
getStateField(state[], slot) + MIN_ATTESTATION_INCLUSION_DELAY, cache,
info, {}).isOk()
let attestations = pool[].getElectraAttestationsForBlock(state[], cache)
check:
# A single final chain aggregated attestation should be created
# with same data, 2 committee bits and 3 aggregation bits
attestations.len == 1
attestations[0].aggregation_bits.countOnes() == 3
attestations[0].committee_bits.countOnes() == 2