mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-18 18:42:35 +00:00
cd4de1357a
Including sync contributions into a block affects validator rewards. When we have not received aggregate sync contributions, but have seen individual messages, we can produce the contributions locally, improving validator rewards when subscribing to all subnets or when having a non-aggregating attached validator in the sync committee.
375 lines
13 KiB
Nim
375 lines
13 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[algorithm, sequtils, sets, tables],
|
|
stew/shims/hashes,
|
|
eth/p2p/discoveryv5/random2,
|
|
chronicles,
|
|
../spec/[crypto, digest, forks],
|
|
../spec/datatypes/altair
|
|
|
|
export hashes, sets, tables, altair
|
|
|
|
logScope:
|
|
topics = "syncpool"
|
|
|
|
const
|
|
syncCommitteeMsgsRetentionSlots = 3
|
|
## How many slots to retain sync committee
|
|
## messsages before discarding them.
|
|
|
|
type
|
|
SyncCommitteeMsgKey = object
|
|
originator: uint64 # ValidatorIndex to avoid invalid values
|
|
slot: Slot
|
|
subcommitteeIdx: uint64 # SyncSubcommitteeIndex to avoid invalid values
|
|
|
|
TrustedSyncCommitteeMsg* = object
|
|
subcommitteeIdx*: SyncSubcommitteeIndex
|
|
positionInCommittee*: uint64
|
|
signature*: CookedSig
|
|
|
|
BestSyncSubcommitteeContribution* = object
|
|
totalParticipants*: int
|
|
participationBits*: SyncCommitteeAggregationBits
|
|
signature*: CookedSig
|
|
|
|
BestSyncSubcommitteeContributions* = object
|
|
subnets*: array[SYNC_COMMITTEE_SUBNET_COUNT,
|
|
BestSyncSubcommitteeContribution]
|
|
|
|
OnSyncContributionCallback* =
|
|
proc(data: SignedContributionAndProof) {.gcsafe, raises: [].}
|
|
|
|
# Messages from different slots / forks may sign the same beacon block root.
|
|
# Messages across slots are compatible, but not across forks (signing root).
|
|
# Messages from different periods have different signers, so are incompatible.
|
|
# Note that the sync committee is determined by `message.slot + 1`, the fork
|
|
# is determined by `message.slot`, and both can be different from `bid.slot`.
|
|
SyncMsgTarget = object
|
|
bid: BlockId # Based on message `beacon_block_root`
|
|
period: SyncCommitteePeriod # Based on message `slot + 1`
|
|
fork: ConsensusFork # Based on message `slot`
|
|
|
|
SyncCommitteeMsgPool* = object
|
|
seenSyncMsgByAuthor*: Table[SyncCommitteeMsgKey, Eth2Digest]
|
|
seenContributionByAuthor*: HashSet[SyncCommitteeMsgKey]
|
|
syncMessages*: Table[SyncMsgTarget, seq[TrustedSyncCommitteeMsg]]
|
|
bestContributions*: Table[SyncMsgTarget, BestSyncSubcommitteeContributions]
|
|
onContributionReceived*: OnSyncContributionCallback
|
|
|
|
rng: ref HmacDrbgContext
|
|
cfg: RuntimeConfig
|
|
|
|
func hash*(x: SyncCommitteeMsgKey): Hash =
|
|
hashAllFields(x)
|
|
|
|
func toSyncMsgTarget(
|
|
cfg: RuntimeConfig, bid: BlockId, slot: Slot): SyncMsgTarget =
|
|
SyncMsgTarget(
|
|
bid: bid,
|
|
period: (slot + 1).sync_committee_period,
|
|
fork: cfg.consensusForkAtEpoch(slot.epoch))
|
|
|
|
func hash(x: SyncMsgTarget): Hash =
|
|
hashAllFields(x)
|
|
|
|
func `<`(x, y: SyncMsgTarget): bool =
|
|
if x.bid.slot != y.bid.slot:
|
|
x.bid.slot < y.bid.slot
|
|
elif x.period != y.period:
|
|
x.period < y.period
|
|
else:
|
|
x.fork < y.fork
|
|
|
|
func init*(T: type SyncCommitteeMsgPool,
|
|
rng: ref HmacDrbgContext,
|
|
cfg: RuntimeConfig,
|
|
onSyncContribution: OnSyncContributionCallback = nil
|
|
): SyncCommitteeMsgPool =
|
|
T(rng: rng, cfg: cfg, onContributionReceived: onSyncContribution)
|
|
|
|
func pruneData*(pool: var SyncCommitteeMsgPool, slot: Slot, force = false) =
|
|
## This should be called at the end of slot.
|
|
clear pool.seenContributionByAuthor
|
|
clear pool.seenSyncMsgByAuthor
|
|
|
|
if slot < syncCommitteeMsgsRetentionSlots:
|
|
return
|
|
|
|
# Messages signing a `beacon_block_root` may remain valid over multiple slots.
|
|
# Therefore, we filter by the targeted `BlockId` instead of message `slot`.
|
|
let
|
|
minSlotToRetain = slot - syncCommitteeMsgsRetentionSlots
|
|
minEntriesToKeep = if force: 0 else: syncCommitteeMsgsRetentionSlots
|
|
|
|
template pruneTable(table: untyped) =
|
|
if table.len > minEntriesToKeep:
|
|
var targets = table.keys().toSeq()
|
|
targets.sort(order = SortOrder.Descending)
|
|
for i in minEntriesToKeep ..< targets.len:
|
|
if targets[i].bid.slot < minSlotToRetain:
|
|
table.del targets[i]
|
|
|
|
pruneTable pool.syncMessages
|
|
pruneTable pool.bestContributions
|
|
|
|
func isSeen*(
|
|
pool: SyncCommitteeMsgPool,
|
|
msg: SyncCommitteeMessage,
|
|
subcommitteeIdx: SyncSubcommitteeIndex,
|
|
headBid: BlockId): bool =
|
|
let seenKey = SyncCommitteeMsgKey(
|
|
originator: msg.validator_index, # Might be unvalidated at this point
|
|
slot: msg.slot,
|
|
subcommitteeIdx: subcommitteeIdx.uint64)
|
|
return
|
|
if seenKey notin pool.seenSyncMsgByAuthor:
|
|
false
|
|
elif msg.beacon_block_root == headBid.root:
|
|
pool.seenSyncMsgByAuthor.getOrDefault(seenKey) == headBid.root
|
|
else:
|
|
true
|
|
|
|
proc addSyncCommitteeMessage*(
|
|
pool: var SyncCommitteeMsgPool,
|
|
slot: Slot,
|
|
bid: BlockId,
|
|
validatorIndex: uint64,
|
|
signature: CookedSig,
|
|
subcommitteeIdx: SyncSubcommitteeIndex,
|
|
positionsInCommittee: seq[uint64]) =
|
|
let seenKey = SyncCommitteeMsgKey(
|
|
originator: validatorIndex,
|
|
slot: slot,
|
|
subcommitteeIdx: subcommitteeIdx.uint64)
|
|
pool.seenSyncMsgByAuthor[seenKey] = bid.root
|
|
|
|
func registerVotes(votes: var seq[TrustedSyncCommitteeMsg]) =
|
|
for position in positionsInCommittee:
|
|
block addVote:
|
|
for vote in votes:
|
|
if vote.subcommitteeIdx == subcommitteeIdx and
|
|
vote.positionInCommittee == position:
|
|
break addVote
|
|
votes.add TrustedSyncCommitteeMsg(
|
|
subcommitteeIdx: subcommitteeIdx,
|
|
positionInCommittee: position,
|
|
signature: signature)
|
|
let target = pool.cfg.toSyncMsgTarget(bid, slot)
|
|
pool.syncMessages.mgetOrPut(target, @[]).registerVotes()
|
|
|
|
debug "Sync committee message resolved",
|
|
slot = slot, blockRoot = shortLog(target.bid.root), validatorIndex
|
|
|
|
func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg],
|
|
subcommitteeIdx: SyncSubcommitteeIndex,
|
|
contribution: var SyncCommitteeContribution): bool =
|
|
var
|
|
aggregateSig {.noinit.}: AggregateSignature
|
|
initialized = false
|
|
|
|
contribution.aggregation_bits.reset()
|
|
for vote in votes:
|
|
if vote.subcommitteeIdx != subcommitteeIdx:
|
|
continue
|
|
|
|
if not contribution.aggregation_bits[vote.positionInCommittee]:
|
|
if not initialized:
|
|
initialized = true
|
|
aggregateSig.init(vote.signature)
|
|
else:
|
|
aggregateSig.aggregate(vote.signature)
|
|
|
|
contribution.aggregation_bits.setBit vote.positionInCommittee
|
|
|
|
if initialized:
|
|
contribution.signature = aggregateSig.finish.toValidatorSig
|
|
else:
|
|
contribution.signature = ValidatorSig.infinity
|
|
|
|
initialized
|
|
|
|
func produceContribution*(
|
|
pool: SyncCommitteeMsgPool,
|
|
slot: Slot,
|
|
headBid: BlockId,
|
|
subcommitteeIdx: SyncSubcommitteeIndex,
|
|
outContribution: var SyncCommitteeContribution): bool =
|
|
let target = pool.cfg.toSyncMsgTarget(headBid, slot)
|
|
if target in pool.syncMessages:
|
|
outContribution.slot = slot
|
|
outContribution.beacon_block_root = headBid.root
|
|
outContribution.subcommittee_index = subcommitteeIdx.asUInt64
|
|
try:
|
|
computeAggregateSig(pool.syncMessages[target],
|
|
subcommitteeIdx,
|
|
outContribution)
|
|
except KeyError:
|
|
raiseAssert "We have checked for the key upfront"
|
|
else:
|
|
false
|
|
|
|
func addContribution(
|
|
contributions: var BestSyncSubcommitteeContributions,
|
|
contribution: SyncCommitteeContribution) =
|
|
let
|
|
currentBestTotalParticipants =
|
|
contributions.subnets[contribution.subcommittee_index].totalParticipants
|
|
newBestTotalParticipants = countOnes(contribution.aggregation_bits)
|
|
|
|
if newBestTotalParticipants > currentBestTotalParticipants:
|
|
contributions.subnets[contribution.subcommittee_index] =
|
|
BestSyncSubcommitteeContribution(
|
|
totalParticipants: newBestTotalParticipants,
|
|
participationBits: contribution.aggregation_bits,
|
|
signature: contribution.signature.load.get)
|
|
|
|
func isSeen*(
|
|
pool: SyncCommitteeMsgPool,
|
|
msg: ContributionAndProof): bool =
|
|
let seenKey = SyncCommitteeMsgKey(
|
|
originator: msg.aggregator_index,
|
|
slot: msg.contribution.slot,
|
|
subcommitteeIdx: msg.contribution.subcommittee_index)
|
|
seenKey in pool.seenContributionByAuthor
|
|
|
|
func covers(
|
|
contributions: BestSyncSubcommitteeContributions,
|
|
contribution: SyncCommitteeContribution): bool =
|
|
contribution.aggregation_bits.isSubsetOf(
|
|
contributions.subnets[contribution.subcommittee_index].participationBits)
|
|
|
|
func covers*(
|
|
pool: var SyncCommitteeMsgPool,
|
|
contribution: SyncCommitteeContribution,
|
|
bid: BlockId): bool =
|
|
## Return true iff the given contribution brings no new information compared
|
|
## to the contributions already seen in the pool, ie if the contriubution is a
|
|
## subset of the best contribution so far
|
|
let target = pool.cfg.toSyncMsgTarget(bid, contribution.slot)
|
|
pool.bestContributions.withValue(target, best):
|
|
return best[].covers(contribution)
|
|
|
|
return false
|
|
|
|
proc addContribution(pool: var SyncCommitteeMsgPool,
|
|
aggregator_index: uint64,
|
|
contribution: SyncCommitteeContribution,
|
|
bid: BlockId,
|
|
signature: CookedSig) =
|
|
let seenKey = SyncCommitteeMsgKey(
|
|
originator: aggregator_index,
|
|
slot: contribution.slot,
|
|
subcommitteeIdx: contribution.subcommittee_index)
|
|
pool.seenContributionByAuthor.incl seenKey
|
|
|
|
let target = pool.cfg.toSyncMsgTarget(bid, contribution.slot)
|
|
pool.bestContributions.withValue(target, contributions):
|
|
contributions[].addContribution(contribution)
|
|
do:
|
|
var contributions: BestSyncSubcommitteeContributions
|
|
contributions.addContribution(contribution)
|
|
pool.bestContributions[target] = contributions
|
|
|
|
proc addContribution*(pool: var SyncCommitteeMsgPool,
|
|
scproof: SignedContributionAndProof,
|
|
bid: BlockId,
|
|
signature: CookedSig) =
|
|
pool.addContribution(
|
|
scproof.message.aggregator_index,
|
|
scproof.message.contribution,
|
|
bid, signature)
|
|
|
|
if not(isNil(pool.onContributionReceived)):
|
|
pool.onContributionReceived(scproof)
|
|
|
|
proc produceSyncAggregateAux(
|
|
contributions: BestSyncSubcommitteeContributions): SyncAggregate =
|
|
var
|
|
aggregateSig {.noinit.}: AggregateSignature
|
|
initialized = false
|
|
startTime = Moment.now
|
|
aggregate: SyncAggregate
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
if contributions.subnets[subcommitteeIdx].totalParticipants == 0:
|
|
continue
|
|
|
|
for pos, value in contributions.subnets[subcommitteeIdx].participationBits:
|
|
if value:
|
|
let globalPos = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE + pos
|
|
aggregate.sync_committee_bits.setBit globalPos
|
|
|
|
if not initialized:
|
|
initialized = true
|
|
aggregateSig.init(contributions.subnets[subcommitteeIdx].signature)
|
|
else:
|
|
aggregateSig.aggregate(contributions.subnets[subcommitteeIdx].signature)
|
|
|
|
if initialized:
|
|
aggregate.sync_committee_signature = aggregateSig.finish.toValidatorSig
|
|
else:
|
|
aggregate.sync_committee_signature = ValidatorSig.infinity
|
|
|
|
let duration = Moment.now - startTime
|
|
debug "SyncAggregate produced", duration,
|
|
bits = aggregate.sync_committee_bits
|
|
|
|
aggregate
|
|
|
|
proc produceSyncAggregate*(
|
|
pool: var SyncCommitteeMsgPool,
|
|
bid: BlockId,
|
|
signatureSlot: Slot): SyncAggregate =
|
|
# Sync committee signs previous slot, relative to when new block is produced
|
|
let
|
|
slot = max(signatureSlot, 1.Slot) - 1
|
|
target = pool.cfg.toSyncMsgTarget(bid, slot)
|
|
|
|
var contribution {.noinit.}: SyncCommitteeContribution
|
|
pool.bestContributions.withValue(target, contributions):
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
if contributions.subnets[subcommitteeIdx].totalParticipants == 0 and
|
|
pool.produceContribution(slot, bid, subcommitteeIdx, contribution):
|
|
debug "Did not receive contribution, did aggregate locally",
|
|
target, subcommitteeIdx
|
|
contributions[].addContribution(contribution)
|
|
do:
|
|
var
|
|
contributions: BestSyncSubcommitteeContributions
|
|
didAggregate = false
|
|
for subcommitteeIdx in SyncSubcommitteeIndex:
|
|
if pool.produceContribution(slot, bid, subcommitteeIdx, contribution):
|
|
debug "Did not receive contribution, did aggregate locally",
|
|
target, subcommitteeIdx
|
|
contributions.addContribution(contribution)
|
|
didAggregate = true
|
|
if didAggregate:
|
|
pool.bestContributions[target] = contributions
|
|
|
|
if target in pool.bestContributions:
|
|
try:
|
|
produceSyncAggregateAux(pool.bestContributions[target])
|
|
except KeyError:
|
|
raiseAssert "We have checked for the key upfront"
|
|
else:
|
|
SyncAggregate.init()
|
|
|
|
proc isEpochLeadTime*(
|
|
pool: SyncCommitteeMsgPool, epochsToSyncPeriod: uint64): bool =
|
|
# https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.3/specs/altair/validator.md#sync-committee-subnet-stability
|
|
# This ensures a uniform distribution without requiring additional state:
|
|
# (1/4) = 1/4, 4 slots out
|
|
# (3/4) * (1/3) = 1/4, 3 slots out
|
|
# (3/4) * (2/3) * (1/2) = 1/4, 2 slots out
|
|
# (3/4) * (2/3) * (1/2) * (1/1) = 1/4, 1 slot out
|
|
doAssert epochsToSyncPeriod > 0
|
|
epochsToSyncPeriod == 1 or pool.rng[].rand(epochsToSyncPeriod - 1) == 0
|