# beacon_chain # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import std/[algorithm, sequtils, sets, tables], stew/shims/hashes, eth/p2p/discoveryv5/random2, chronicles, ../spec/[crypto, digest, forks], ../spec/datatypes/altair export hashes, sets, tables, altair logScope: topics = "syncpool" const syncCommitteeMsgsRetentionSlots = 3 ## How many slots to retain sync committee ## messsages before discarding them. type SyncCommitteeMsgKey = object originator: uint64 # ValidatorIndex to avoid invalid values slot: Slot subcommitteeIdx: uint64 # SyncSubcommitteeIndex to avoid invalid values TrustedSyncCommitteeMsg* = object subcommitteeIdx*: SyncSubcommitteeIndex positionInCommittee*: uint64 signature*: CookedSig BestSyncSubcommitteeContribution* = object totalParticipants*: int participationBits*: SyncCommitteeAggregationBits signature*: CookedSig BestSyncSubcommitteeContributions* = object subnets*: array[SYNC_COMMITTEE_SUBNET_COUNT, BestSyncSubcommitteeContribution] OnSyncContributionCallback* = proc(data: SignedContributionAndProof) {.gcsafe, raises: [].} # Messages from different slots / forks may sign the same beacon block root. # Messages across slots are compatible, but not across forks (signing root). # Messages from different periods have different signers, so are incompatible. # Note that the sync committee is determined by `message.slot + 1`, the fork # is determined by `message.slot`, and both can be different from `bid.slot`. SyncMsgTarget = object bid: BlockId # Based on message `beacon_block_root` period: SyncCommitteePeriod # Based on message `slot + 1` fork: ConsensusFork # Based on message `slot` SyncCommitteeMsgPool* = object seenSyncMsgByAuthor*: Table[SyncCommitteeMsgKey, Eth2Digest] seenContributionByAuthor*: HashSet[SyncCommitteeMsgKey] syncMessages*: Table[SyncMsgTarget, seq[TrustedSyncCommitteeMsg]] bestContributions*: Table[SyncMsgTarget, BestSyncSubcommitteeContributions] onContributionReceived*: OnSyncContributionCallback rng: ref HmacDrbgContext cfg: RuntimeConfig func hash*(x: SyncCommitteeMsgKey): Hash = hashAllFields(x) func toSyncMsgTarget( cfg: RuntimeConfig, bid: BlockId, slot: Slot): SyncMsgTarget = SyncMsgTarget( bid: bid, period: (slot + 1).sync_committee_period, fork: cfg.consensusForkAtEpoch(slot.epoch)) func hash(x: SyncMsgTarget): Hash = hashAllFields(x) func `<`(x, y: SyncMsgTarget): bool = if x.bid.slot != y.bid.slot: x.bid.slot < y.bid.slot elif x.period != y.period: x.period < y.period else: x.fork < y.fork func init*(T: type SyncCommitteeMsgPool, rng: ref HmacDrbgContext, cfg: RuntimeConfig, onSyncContribution: OnSyncContributionCallback = nil ): SyncCommitteeMsgPool = T(rng: rng, cfg: cfg, onContributionReceived: onSyncContribution) func pruneData*(pool: var SyncCommitteeMsgPool, slot: Slot, force = false) = ## This should be called at the end of slot. clear pool.seenContributionByAuthor clear pool.seenSyncMsgByAuthor if slot < syncCommitteeMsgsRetentionSlots: return # Messages signing a `beacon_block_root` may remain valid over multiple slots. # Therefore, we filter by the targeted `BlockId` instead of message `slot`. let minSlotToRetain = slot - syncCommitteeMsgsRetentionSlots minEntriesToKeep = if force: 0 else: syncCommitteeMsgsRetentionSlots template pruneTable(table: untyped) = if table.len > minEntriesToKeep: var targets = table.keys().toSeq() targets.sort(order = SortOrder.Descending) for i in minEntriesToKeep ..< targets.len: if targets[i].bid.slot < minSlotToRetain: table.del targets[i] pruneTable pool.syncMessages pruneTable pool.bestContributions func isSeen*( pool: SyncCommitteeMsgPool, msg: SyncCommitteeMessage, subcommitteeIdx: SyncSubcommitteeIndex, headBid: BlockId): bool = let seenKey = SyncCommitteeMsgKey( originator: msg.validator_index, # Might be unvalidated at this point slot: msg.slot, subcommitteeIdx: subcommitteeIdx.uint64) return if seenKey notin pool.seenSyncMsgByAuthor: false elif msg.beacon_block_root == headBid.root: pool.seenSyncMsgByAuthor.getOrDefault(seenKey) == headBid.root else: true proc addSyncCommitteeMessage*( pool: var SyncCommitteeMsgPool, slot: Slot, bid: BlockId, validatorIndex: uint64, signature: CookedSig, subcommitteeIdx: SyncSubcommitteeIndex, positionsInCommittee: seq[uint64]) = let seenKey = SyncCommitteeMsgKey( originator: validatorIndex, slot: slot, subcommitteeIdx: subcommitteeIdx.uint64) pool.seenSyncMsgByAuthor[seenKey] = bid.root func registerVotes(votes: var seq[TrustedSyncCommitteeMsg]) = for position in positionsInCommittee: block addVote: for vote in votes: if vote.subcommitteeIdx == subcommitteeIdx and vote.positionInCommittee == position: break addVote votes.add TrustedSyncCommitteeMsg( subcommitteeIdx: subcommitteeIdx, positionInCommittee: position, signature: signature) let target = pool.cfg.toSyncMsgTarget(bid, slot) pool.syncMessages.mgetOrPut(target, @[]).registerVotes() debug "Sync committee message resolved", slot = slot, blockRoot = shortLog(target.bid.root), validatorIndex func computeAggregateSig(votes: seq[TrustedSyncCommitteeMsg], subcommitteeIdx: SyncSubcommitteeIndex, contribution: var SyncCommitteeContribution): bool = var aggregateSig {.noinit.}: AggregateSignature initialized = false contribution.aggregation_bits.reset() for vote in votes: if vote.subcommitteeIdx != subcommitteeIdx: continue if not contribution.aggregation_bits[vote.positionInCommittee]: if not initialized: initialized = true aggregateSig.init(vote.signature) else: aggregateSig.aggregate(vote.signature) contribution.aggregation_bits.setBit vote.positionInCommittee if initialized: contribution.signature = aggregateSig.finish.toValidatorSig else: contribution.signature = ValidatorSig.infinity initialized func produceContribution*( pool: SyncCommitteeMsgPool, slot: Slot, headBid: BlockId, subcommitteeIdx: SyncSubcommitteeIndex, outContribution: var SyncCommitteeContribution): bool = let target = pool.cfg.toSyncMsgTarget(headBid, slot) if target in pool.syncMessages: outContribution.slot = slot outContribution.beacon_block_root = headBid.root outContribution.subcommittee_index = subcommitteeIdx.asUInt64 try: computeAggregateSig(pool.syncMessages[target], subcommitteeIdx, outContribution) except KeyError: raiseAssert "We have checked for the key upfront" else: false func addContribution( contributions: var BestSyncSubcommitteeContributions, contribution: SyncCommitteeContribution) = let currentBestTotalParticipants = contributions.subnets[contribution.subcommittee_index].totalParticipants newBestTotalParticipants = countOnes(contribution.aggregation_bits) if newBestTotalParticipants > currentBestTotalParticipants: contributions.subnets[contribution.subcommittee_index] = BestSyncSubcommitteeContribution( totalParticipants: newBestTotalParticipants, participationBits: contribution.aggregation_bits, signature: contribution.signature.load.get) func isSeen*( pool: SyncCommitteeMsgPool, msg: ContributionAndProof): bool = let seenKey = SyncCommitteeMsgKey( originator: msg.aggregator_index, slot: msg.contribution.slot, subcommitteeIdx: msg.contribution.subcommittee_index) seenKey in pool.seenContributionByAuthor func covers( contributions: BestSyncSubcommitteeContributions, contribution: SyncCommitteeContribution): bool = contribution.aggregation_bits.isSubsetOf( contributions.subnets[contribution.subcommittee_index].participationBits) func covers*( pool: var SyncCommitteeMsgPool, contribution: SyncCommitteeContribution, bid: BlockId): bool = ## Return true iff the given contribution brings no new information compared ## to the contributions already seen in the pool, ie if the contriubution is a ## subset of the best contribution so far let target = pool.cfg.toSyncMsgTarget(bid, contribution.slot) pool.bestContributions.withValue(target, best): return best[].covers(contribution) return false proc addContribution(pool: var SyncCommitteeMsgPool, aggregator_index: uint64, contribution: SyncCommitteeContribution, bid: BlockId, signature: CookedSig) = let seenKey = SyncCommitteeMsgKey( originator: aggregator_index, slot: contribution.slot, subcommitteeIdx: contribution.subcommittee_index) pool.seenContributionByAuthor.incl seenKey let target = pool.cfg.toSyncMsgTarget(bid, contribution.slot) pool.bestContributions.withValue(target, contributions): contributions[].addContribution(contribution) do: var contributions: BestSyncSubcommitteeContributions contributions.addContribution(contribution) pool.bestContributions[target] = contributions proc addContribution*(pool: var SyncCommitteeMsgPool, scproof: SignedContributionAndProof, bid: BlockId, signature: CookedSig) = pool.addContribution( scproof.message.aggregator_index, scproof.message.contribution, bid, signature) if not(isNil(pool.onContributionReceived)): pool.onContributionReceived(scproof) proc produceSyncAggregateAux( contributions: BestSyncSubcommitteeContributions): SyncAggregate = var aggregateSig {.noinit.}: AggregateSignature initialized = false startTime = Moment.now aggregate: SyncAggregate for subcommitteeIdx in SyncSubcommitteeIndex: if contributions.subnets[subcommitteeIdx].totalParticipants == 0: continue for pos, value in contributions.subnets[subcommitteeIdx].participationBits: if value: let globalPos = subcommitteeIdx.asInt * SYNC_SUBCOMMITTEE_SIZE + pos aggregate.sync_committee_bits.setBit globalPos if not initialized: initialized = true aggregateSig.init(contributions.subnets[subcommitteeIdx].signature) else: aggregateSig.aggregate(contributions.subnets[subcommitteeIdx].signature) if initialized: aggregate.sync_committee_signature = aggregateSig.finish.toValidatorSig else: aggregate.sync_committee_signature = ValidatorSig.infinity let duration = Moment.now - startTime debug "SyncAggregate produced", duration, bits = aggregate.sync_committee_bits aggregate proc produceSyncAggregate*( pool: var SyncCommitteeMsgPool, bid: BlockId, signatureSlot: Slot): SyncAggregate = # Sync committee signs previous slot, relative to when new block is produced let slot = max(signatureSlot, 1.Slot) - 1 target = pool.cfg.toSyncMsgTarget(bid, slot) var contribution {.noinit.}: SyncCommitteeContribution pool.bestContributions.withValue(target, contributions): for subcommitteeIdx in SyncSubcommitteeIndex: if contributions.subnets[subcommitteeIdx].totalParticipants == 0 and pool.produceContribution(slot, bid, subcommitteeIdx, contribution): debug "Did not receive contribution, did aggregate locally", target, subcommitteeIdx contributions[].addContribution(contribution) do: var contributions: BestSyncSubcommitteeContributions didAggregate = false for subcommitteeIdx in SyncSubcommitteeIndex: if pool.produceContribution(slot, bid, subcommitteeIdx, contribution): debug "Did not receive contribution, did aggregate locally", target, subcommitteeIdx contributions.addContribution(contribution) didAggregate = true if didAggregate: pool.bestContributions[target] = contributions if target in pool.bestContributions: try: produceSyncAggregateAux(pool.bestContributions[target]) except KeyError: raiseAssert "We have checked for the key upfront" else: SyncAggregate.init() proc isEpochLeadTime*( pool: SyncCommitteeMsgPool, epochsToSyncPeriod: uint64): bool = # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.6/specs/altair/validator.md#sync-committee-subnet-stability # This ensures a uniform distribution without requiring additional state: # (1/4) = 1/4, 4 slots out # (3/4) * (1/3) = 1/4, 3 slots out # (3/4) * (2/3) * (1/2) = 1/4, 2 slots out # (3/4) * (2/3) * (1/2) * (1/1) = 1/4, 1 slot out doAssert epochsToSyncPeriod > 0 epochsToSyncPeriod == 1 or pool.rng[].rand(epochsToSyncPeriod - 1) == 0