diff --git a/beacon_chain/consensus_object_pools/sync_committee_msg_pool.nim b/beacon_chain/consensus_object_pools/sync_committee_msg_pool.nim index 3346e5d96..633ef6e22 100644 --- a/beacon_chain/consensus_object_pools/sync_committee_msg_pool.nim +++ b/beacon_chain/consensus_object_pools/sync_committee_msg_pool.nim @@ -58,7 +58,6 @@ type onContributionReceived*: OnSyncContributionCallback rng: ref HmacDrbgContext - syncCommitteeSubscriptions*: Table[ValidatorPubKey, Epoch] func hash*(x: SyncCommitteeMsgKey): Hash = hashAllFields(x) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 80992dc0e..155a1ea9e 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -879,35 +879,61 @@ func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto = (func(pubkey: ValidatorPubKey): bool {.closure.} = true) else: (func(pubkey: ValidatorPubKey): bool = - node.syncCommitteeMsgPool.syncCommitteeSubscriptions.getOrDefault( - pubkey, GENESIS_EPOCH) >= epoch or + node.consensusManager[].actionTracker.hasSyncDuty(pubkey, epoch) or pubkey in node.attachedValidators[].validators) -func getCurrentSyncCommiteeSubnets(node: BeaconNode, slot: Slot): SyncnetBits = +func getCurrentSyncCommiteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = let syncCommittee = withState(node.dag.headState): when stateFork >= BeaconStateFork.Altair: forkyState.data.current_sync_committee else: return static(default(SyncnetBits)) - getSyncSubnets(node.hasSyncPubKey(slot.epoch), syncCommittee) + getSyncSubnets(node.hasSyncPubKey(epoch), syncCommittee) + +func getNextSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = + let syncCommittee = withState(node.dag.headState): + when stateFork >= BeaconStateFork.Altair: + forkyState.data.next_sync_committee + else: + return static(default(SyncnetBits)) + + getSyncSubnets( + node.hasSyncPubKey((epoch.sync_committee_period + 1).start_slot().epoch), + syncCommittee) + +func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = + let + subnets = node.getCurrentSyncCommiteeSubnets(epoch) + epochsToSyncPeriod = nearSyncCommitteePeriod(epoch) + + # The end-slot tracker might call this when it's theoretically applicable, + # but more than SYNC_COMMITTEE_SUBNET_COUNT epochs from when the next sync + # committee period begins, in which case `epochsToNextSyncPeriod` is none. + if epochsToSyncPeriod.isNone or + node.dag.cfg.stateForkAtEpoch(epoch + epochsToSyncPeriod.get) < + BeaconStateFork.Altair: + return subnets + + subnets + node.getNextSyncCommitteeSubnets(epoch) proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addPhase0MessageHandlers(forkDigest, slot) # If this comes online near sync committee period, it'll immediately get # replaced as usual by trackSyncCommitteeTopics, which runs at slot end. - let currentSyncCommitteeSubnets = node.getCurrentSyncCommiteeSubnets(slot) + let + syncnets = node.getSyncCommitteeSubnets(slot.epoch) for subcommitteeIdx in SyncSubcommitteeIndex: - if currentSyncCommitteeSubnets[subcommitteeIdx]: + if syncnets[subcommitteeIdx]: node.network.subscribe( getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams) node.network.subscribe( getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams) - node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) + node.network.updateSyncnetsMetadata(syncnets) proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removePhase0MessageHandlers(forkDigest) @@ -920,15 +946,22 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.network.unsubscribe( getSyncCommitteeContributionAndProofTopic(forkDigest)) -proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) = - # Unlike trackNextSyncCommitteeTopics, just snap to the currently correct - # set of subscriptions, and use current_sync_committee. Furthermore, this - # is potentially useful at arbitrary times, so don't guard it by checking - # for epoch alignment. - let currentSyncCommitteeSubnets = node.getCurrentSyncCommiteeSubnets(slot) +proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) = + template lastSyncUpdate: untyped = + node.consensusManager[].actionTracker.lastSyncUpdate + if lastSyncUpdate == Opt.some(slot.sync_committee_period()) and + nearSyncCommitteePeriod(slot.epoch).isNone(): + # No need to update unless we're close to the next sync committee period or + # new validators were registered with the action tracker + # TODO we _could_ skip running this in some of the "near" slots, but.. + return - debug "trackCurrentSyncCommitteeTopics: aligning with sync committee subnets", - currentSyncCommitteeSubnets, + lastSyncUpdate = Opt.some(slot.sync_committee_period()) + + let syncnets = node.getSyncCommitteeSubnets(slot.epoch) + + debug "Updating sync committee subnets", + syncnets, metadata_syncnets = node.network.metadata.syncnets, gossipState = node.gossipState @@ -936,89 +969,28 @@ proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) = # only remains relevant, currently, for one gossip transition epoch, so the # consequences of this not being true aren't exceptionally dire, while this # allows for bookkeeping simplication. - if currentSyncCommitteeSubnets == node.network.metadata.syncnets: + if syncnets == node.network.metadata.syncnets: return let - newSyncSubnets = - currentSyncCommitteeSubnets - node.network.metadata.syncnets - oldSyncSubnets = - node.network.metadata.syncnets - currentSyncCommitteeSubnets + newSyncnets = + syncnets - node.network.metadata.syncnets + oldSyncnets = + node.network.metadata.syncnets - syncnets forkDigests = node.forkDigests() for subcommitteeIdx in SyncSubcommitteeIndex: - doAssert not (newSyncSubnets[subcommitteeIdx] and - oldSyncSubnets[subcommitteeIdx]) + doAssert not (newSyncnets[subcommitteeIdx] and + oldSyncnets[subcommitteeIdx]) for gossipFork in node.gossipState: template topic(): auto = getSyncCommitteeTopic(forkDigests[gossipFork], subcommitteeIdx) - if oldSyncSubnets[subcommitteeIdx]: + if oldSyncnets[subcommitteeIdx]: node.network.unsubscribe(topic) - elif newSyncSubnets[subcommitteeIdx]: + elif newSyncnets[subcommitteeIdx]: node.network.subscribe(topic, basicParams) - node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) - -func getNextSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = - let epochsToSyncPeriod = nearSyncCommitteePeriod(epoch) - - # The end-slot tracker might call this when it's theoretically applicable, - # but more than SYNC_COMMITTEE_SUBNET_COUNT epochs from when the next sync - # committee period begins, in which case `epochsToNextSyncPeriod` is none. - if epochsToSyncPeriod.isNone or - node.dag.cfg.stateForkAtEpoch(epoch + epochsToSyncPeriod.get) < - BeaconStateFork.Altair: - return static(default(SyncnetBits)) - - let syncCommittee = withState(node.dag.headState): - when stateFork >= BeaconStateFork.Altair: - forkyState.data.next_sync_committee - else: - return static(default(SyncnetBits)) - - getSyncSubnets( - node.hasSyncPubKey(epoch + epochsToSyncPeriod.get), syncCommittee) - -proc trackNextSyncCommitteeTopics(node: BeaconNode, slot: Slot) = - let - epoch = slot.epoch - epochsToSyncPeriod = nearSyncCommitteePeriod(epoch) - - if epochsToSyncPeriod.isNone or - node.dag.cfg.stateForkAtEpoch(epoch + epochsToSyncPeriod.get) < - BeaconStateFork.Altair: - return - - # No lookahead required - if epochsToSyncPeriod.get == 0: - node.trackCurrentSyncCommitteeTopics(slot) - return - - let nextSyncCommitteeSubnets = node.getNextSyncCommitteeSubnets(epoch) - - let forkDigests = node.forkDigests() - - var newSubcommittees: SyncnetBits - - # https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/validator.md#sync-committee-subnet-stability - for subcommitteeIdx in SyncSubcommitteeIndex: - if (not node.network.metadata.syncnets[subcommitteeIdx]) and - nextSyncCommitteeSubnets[subcommitteeIdx] and - node.syncCommitteeMsgPool[].isEpochLeadTime(epochsToSyncPeriod.get): - for gossipFork in node.gossipState: - node.network.subscribe(getSyncCommitteeTopic( - forkDigests[gossipFork], subcommitteeIdx), basicParams) - newSubcommittees.setBit(distinctBase(subcommitteeIdx)) - - debug "trackNextSyncCommitteeTopics: subscribing to sync committee subnets", - metadata_syncnets = node.network.metadata.syncnets, - nextSyncCommitteeSubnets, - gossipState = node.gossipState, - epochsToSyncPeriod = epochsToSyncPeriod.get, - newSubcommittees - - node.network.updateSyncnetsMetadata( - node.network.metadata.syncnets + newSubcommittees) + node.network.updateSyncnetsMetadata(syncnets) proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = ## Subscribe to subnets that we are providing stability for or aggregating @@ -1172,7 +1144,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = node.syncCommitteeMsgPool[].pruneData(slot) if slot.is_epoch: - node.trackNextSyncCommitteeTopics(slot) node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch) # Update upcoming actions - we do this every slot in case a reorg happens @@ -1208,11 +1179,9 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # int64 conversion is safe doAssert slotsToNextSyncCommitteePeriod <= SLOTS_PER_SYNC_COMMITTEE_PERIOD - if not node.getCurrentSyncCommiteeSubnets(slot).isZeros: + if not node.getCurrentSyncCommiteeSubnets(slot.epoch).isZeros: "current" - # if 0 => fallback is getCurrentSyncCommitttee so avoid duplicate effort - elif since_sync_committee_period_start(slot) > 0 and - not node.getNextSyncCommitteeSubnets(slot.epoch).isZeros: + elif not node.getNextSyncCommitteeSubnets(slot.epoch).isZeros: "in " & toTimeLeftString( SECONDS_PER_SLOT.int64.seconds * slotsToNextSyncCommitteePeriod.int64) else: @@ -1258,6 +1227,8 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # The last thing we do is to perform the subscriptions and unsubscriptions for # the next slot, just before that slot starts - because of the advance cuttoff # above, this will be done just before the next slot starts + node.updateSyncCommitteeTopics(slot + 1) + await node.updateGossipStatus(slot + 1) func syncStatus(node: BeaconNode, wallSlot: Slot): string = diff --git a/beacon_chain/rpc/rest_validator_api.nim b/beacon_chain/rpc/rest_validator_api.nim index bdf55e93b..506de9edb 100644 --- a/beacon_chain/rpc/rest_validator_api.nim +++ b/beacon_chain/rpc/rest_validator_api.nim @@ -683,8 +683,8 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = getStateField(node.dag.headState, validators).item( item.validator_index).pubkey - node.syncCommitteeMsgPool - .syncCommitteeSubscriptions[validator_pubkey] = item.until_epoch + node.consensusManager[].actionTracker.registerSyncDuty( + validator_pubkey, item.until_epoch) node.validatorMonitor[].addAutoMonitor( validator_pubkey, ValidatorIndex(item.validator_index)) diff --git a/beacon_chain/validators/action_tracker.nim b/beacon_chain/validators/action_tracker.nim index 3705f2819..46790241d 100644 --- a/beacon_chain/validators/action_tracker.nim +++ b/beacon_chain/validators/action_tracker.nim @@ -74,6 +74,9 @@ type ## duty, we'll subscribe to the corresponding subnet to collect ## attestations for the aggregate + lastSyncUpdate*: Opt[SyncCommitteePeriod] + syncDuties*: Table[ValidatorPubKey, Epoch] + func hash*(x: AggregatorDuty): Hash = hashAllFields(x) @@ -106,6 +109,26 @@ proc registerDuty*( debug "Registering aggregation duty", slot, subnet_id, vidx tracker.duties.incl(newDuty) +proc registerSyncDuty*( + tracker: var ActionTracker, pubkey: ValidatorPubKey, until_epoch: Epoch) = + if tracker.currentSlot.epoch >= until_epoch: + return + + tracker.syncDuties.withValue(pubkey, entry) do: + if entry[] < until_epoch: + debug "Updating sync duty", + pubkey = shortLog(pubkey), prev_until_epoch = entry[], until_epoch + entry[] = until_epoch + reset(tracker.lastSyncUpdate) + do: + debug "Registering sync duty", pubkey = shortLog(pubkey), until_epoch + tracker.syncDuties[pubkey] = until_epoch + reset(tracker.lastSyncUpdate) + +proc hasSyncDuty*( + tracker: ActionTracker, pubkey: ValidatorPubKey, epoch: Epoch): bool = + epoch < tracker.syncDuties.getOrDefault(pubkey, GENESIS_EPOCH) + const allSubnetBits = block: var res: AttnetBits for i in 0..= wallSlot) + block: + var dels: seq[ValidatorPubKey] + for k, v in tracker.syncDuties: + if wallSlot.epoch >= v: + dels.add k + for k in dels: + tracker.syncDuties.del(k) + # Keep stability subnets for as long as validators are validating var toPrune: seq[ValidatorIndex] for k, v in tracker.knownValidators: diff --git a/tests/test_action_tracker.nim b/tests/test_action_tracker.nim index a9739320a..08ab9e31b 100644 --- a/tests/test_action_tracker.nim +++ b/tests/test_action_tracker.nim @@ -53,3 +53,39 @@ suite "subnet tracker": check: tracker.stabilitySubnets(Slot(0)).countOnes() == 0 tracker.aggregateSubnets(Slot(0)).countOnes() == 0 + + test "should register sync committee duties": + var + tracker = ActionTracker.init(rng, false) + pk0 = ValidatorPubKey.fromHex("0xb4102a1f6c80e5c596a974ebd930c9f809c3587dc4d1d3634b77ff66db71e376dbc86c3252c6d140ce031f4ec6167798").get() + pk1 = ValidatorPubKey.fromHex("0xa00d2954717425ce047e0928e5f4ec7c0e3bbe1058db511303fd659770ddace686ee2e22ac180422e516f4c503eb2228").get() + + check: + not tracker.hasSyncDuty(pk0, Epoch(1024)) + + tracker.lastSyncUpdate = Opt.some(SyncCommitteePeriod(42)) + tracker.registerSyncDuty(pk0, Epoch(1024)) + check: + tracker.lastSyncUpdate.isNone() + not tracker.hasSyncDuty(pk0, Epoch(1024)) + not tracker.hasSyncDuty(pk1, Epoch(1023)) + tracker.hasSyncDuty(pk0, Epoch(1023)) + + tracker.registerSyncDuty(pk0, Epoch(1022)) + + check: # Should not overwrite longer duties + tracker.hasSyncDuty(pk0, Epoch(1023)) + + tracker.registerSyncDuty(pk0, Epoch(1025)) + check: # Should update existing duties + tracker.hasSyncDuty(pk0, Epoch(1024)) + + tracker.updateSlot(Epoch(1025).start_slot) + + check: # should prune old duties on updateSlot + not tracker.hasSyncDuty(pk0, Epoch(1024)) + + tracker.registerSyncDuty(pk0, Epoch(1025)) + + check: # should not add old duties + not tracker.hasSyncDuty(pk0, Epoch(1024))