mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-22 20:42:13 +00:00
Ignore seen aggregates (#3439)
https://github.com/ethereum/consensus-specs/pull/2225 removed an ignore rule that would filter out duplicate aggregates from gossip publishing - however, this causes increased bandwidth and CPU usage as discussed in https://github.com/ethereum/consensus-specs/issues/2183 - the intent is to revert the removal and reinstate the rule. This PR implements ignore filtering which cuts down on CPU usage (fewer aggregates to validate) and bandwidth usage (less fanout of duplicates) - as #2225 points out, this may lead to a small increase in IHAVE messages.
This commit is contained in:
parent
1bfbcc48b6
commit
92e7e288e7
@ -280,6 +280,12 @@ func updateAggregates(entry: var AttestationEntry) =
|
|||||||
inc j
|
inc j
|
||||||
inc i
|
inc i
|
||||||
|
|
||||||
|
func covers(entry: AttestationEntry, bits: CommitteeValidatorsBits): bool =
|
||||||
|
for i in 0..<entry.aggregates.len():
|
||||||
|
if bits.isSubsetOf(entry.aggregates[i].aggregation_bits):
|
||||||
|
return true
|
||||||
|
false
|
||||||
|
|
||||||
proc addAttestation(entry: var AttestationEntry,
|
proc addAttestation(entry: var AttestationEntry,
|
||||||
attestation: Attestation,
|
attestation: Attestation,
|
||||||
signature: CookedSig): bool =
|
signature: CookedSig): bool =
|
||||||
@ -304,12 +310,8 @@ proc addAttestation(entry: var AttestationEntry,
|
|||||||
entry.singles[singleIndex.get()] = signature
|
entry.singles[singleIndex.get()] = signature
|
||||||
else:
|
else:
|
||||||
# More than one vote in this attestation
|
# More than one vote in this attestation
|
||||||
for i in 0..<entry.aggregates.len():
|
if entry.covers(attestation.aggregation_bits):
|
||||||
if attestation.aggregation_bits.isSubsetOf(entry.aggregates[i].aggregation_bits):
|
return false
|
||||||
trace "Aggregate already seen",
|
|
||||||
singles = entry.singles.len(),
|
|
||||||
aggregates = entry.aggregates.len()
|
|
||||||
return false
|
|
||||||
|
|
||||||
# Since we're adding a new aggregate, we can now remove existing
|
# Since we're adding a new aggregate, we can now remove existing
|
||||||
# aggregates that don't add any new votes
|
# aggregates that don't add any new votes
|
||||||
@ -376,6 +378,24 @@ proc addAttestation*(pool: var AttestationPool,
|
|||||||
if not(isNil(pool.onAttestationAdded)):
|
if not(isNil(pool.onAttestationAdded)):
|
||||||
pool.onAttestationAdded(attestation)
|
pool.onAttestationAdded(attestation)
|
||||||
|
|
||||||
|
func covers*(
|
||||||
|
pool: var AttestationPool, data: Attestationdata,
|
||||||
|
bits: CommitteeValidatorsBits): bool =
|
||||||
|
## Return true iff the given attestation already is fully covered by one of
|
||||||
|
## the existing aggregates, making it redundant
|
||||||
|
## the `var` attestation pool is needed to use `withValue`, else Table becomes
|
||||||
|
## unusably inefficient
|
||||||
|
let candidateIdx = pool.candidateIdx(data.slot)
|
||||||
|
if candidateIdx.isNone:
|
||||||
|
return false
|
||||||
|
|
||||||
|
let attestation_data_root = hash_tree_root(data)
|
||||||
|
pool.candidates[candidateIdx.get()].withValue(attestation_data_root, entry):
|
||||||
|
if entry[].covers(bits):
|
||||||
|
return true
|
||||||
|
|
||||||
|
false
|
||||||
|
|
||||||
proc addForkChoice*(pool: var AttestationPool,
|
proc addForkChoice*(pool: var AttestationPool,
|
||||||
epochRef: EpochRef,
|
epochRef: EpochRef,
|
||||||
blckRef: BlockRef,
|
blckRef: BlockRef,
|
||||||
|
@ -621,9 +621,24 @@ proc validateAggregate*(
|
|||||||
return err(v.error)
|
return err(v.error)
|
||||||
v.get()
|
v.get()
|
||||||
|
|
||||||
# [REJECT] aggregate_and_proof.selection_proof selects the validator as an
|
if pool[].covers(aggregate.data, aggregate.aggregation_bits):
|
||||||
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
|
# https://github.com/ethereum/consensus-specs/issues/2183 - althoughh this
|
||||||
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
|
# check was temporarily removed from the spec, the intent is to reinstate it
|
||||||
|
# per discussion in the ticket.
|
||||||
|
#
|
||||||
|
# [IGNORE] The valid aggregate attestation defined by
|
||||||
|
# `hash_tree_root(aggregate)` has _not_ already been seen
|
||||||
|
# (via aggregate gossip, within a verified block, or through the creation of
|
||||||
|
# an equivalent aggregate locally).
|
||||||
|
|
||||||
|
# Our implementation of this check is slightly different in that it doesn't
|
||||||
|
# consider aggregates from verified blocks - this would take a rather heavy
|
||||||
|
# index to work correcly under fork conditions - we also check for coverage
|
||||||
|
# of attestation bits instead of comparing with full root of aggreagte:
|
||||||
|
# this captures the spirit of the checkk by ignoring aggregates that are
|
||||||
|
# strict subsets of other, already-seen aggregates.
|
||||||
|
return errIgnore("Aggregate already covered")
|
||||||
|
|
||||||
let
|
let
|
||||||
epochRef = block:
|
epochRef = block:
|
||||||
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
|
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
|
||||||
@ -641,6 +656,9 @@ proc validateAggregate*(
|
|||||||
return checkedReject("Attestation: committee index not within expected range")
|
return checkedReject("Attestation: committee index not within expected range")
|
||||||
idx.get()
|
idx.get()
|
||||||
|
|
||||||
|
# [REJECT] aggregate_and_proof.selection_proof selects the validator as an
|
||||||
|
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
|
||||||
|
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
|
||||||
if not is_aggregator(
|
if not is_aggregator(
|
||||||
epochRef, slot, committee_index, aggregate_and_proof.selection_proof):
|
epochRef, slot, committee_index, aggregate_and_proof.selection_proof):
|
||||||
return checkedReject("Aggregate: incorrect aggregator")
|
return checkedReject("Aggregate: incorrect aggregator")
|
||||||
|
@ -202,6 +202,9 @@ suite "Attestation pool processing" & preset():
|
|||||||
att0.combine(att2)
|
att0.combine(att2)
|
||||||
att1.combine(att2)
|
att1.combine(att2)
|
||||||
|
|
||||||
|
check:
|
||||||
|
not pool[].covers(att0.data, att0.aggregation_bits)
|
||||||
|
|
||||||
pool[].addAttestation(
|
pool[].addAttestation(
|
||||||
att0, @[bc0[0], bc0[2]], att0.loadSig, att0.data.slot.start_beacon_time)
|
att0, @[bc0[0], bc0[2]], att0.loadSig, att0.data.slot.start_beacon_time)
|
||||||
pool[].addAttestation(
|
pool[].addAttestation(
|
||||||
@ -214,6 +217,7 @@ suite "Attestation pool processing" & preset():
|
|||||||
info, {}).isOk()
|
info, {}).isOk()
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
pool[].covers(att0.data, att0.aggregation_bits)
|
||||||
pool[].getAttestationsForBlock(state.data, cache).len() == 2
|
pool[].getAttestationsForBlock(state.data, cache).len() == 2
|
||||||
# Can get either aggregate here, random!
|
# Can get either aggregate here, random!
|
||||||
pool[].getAggregatedAttestation(1.Slot, 0.CommitteeIndex).isSome()
|
pool[].getAggregatedAttestation(1.Slot, 0.CommitteeIndex).isSome()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user