Fix VC-based sync subnet subscriptions (#4293)

* move duty tracking code to `ActionTracker`
* fix earlier duties overwriting later ones
* re-run subnet selection when new duty appears
* log upcoming duties as soon as they're known (vs 4 epochs before)
This commit is contained in:
Jacek Sieka 2022-11-08 12:43:38 +01:00 committed by GitHub
parent e98cfa88cb
commit 8297b962cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 131 additions and 94 deletions

View File

@ -58,7 +58,6 @@ type
onContributionReceived*: OnSyncContributionCallback onContributionReceived*: OnSyncContributionCallback
rng: ref HmacDrbgContext rng: ref HmacDrbgContext
syncCommitteeSubscriptions*: Table[ValidatorPubKey, Epoch]
func hash*(x: SyncCommitteeMsgKey): Hash = func hash*(x: SyncCommitteeMsgKey): Hash =
hashAllFields(x) hashAllFields(x)

View File

@ -879,35 +879,61 @@ func hasSyncPubKey(node: BeaconNode, epoch: Epoch): auto =
(func(pubkey: ValidatorPubKey): bool {.closure.} = true) (func(pubkey: ValidatorPubKey): bool {.closure.} = true)
else: else:
(func(pubkey: ValidatorPubKey): bool = (func(pubkey: ValidatorPubKey): bool =
node.syncCommitteeMsgPool.syncCommitteeSubscriptions.getOrDefault( node.consensusManager[].actionTracker.hasSyncDuty(pubkey, epoch) or
pubkey, GENESIS_EPOCH) >= epoch or
pubkey in node.attachedValidators[].validators) pubkey in node.attachedValidators[].validators)
func getCurrentSyncCommiteeSubnets(node: BeaconNode, slot: Slot): SyncnetBits = func getCurrentSyncCommiteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
let syncCommittee = withState(node.dag.headState): let syncCommittee = withState(node.dag.headState):
when stateFork >= BeaconStateFork.Altair: when stateFork >= BeaconStateFork.Altair:
forkyState.data.current_sync_committee forkyState.data.current_sync_committee
else: else:
return static(default(SyncnetBits)) return static(default(SyncnetBits))
getSyncSubnets(node.hasSyncPubKey(slot.epoch), syncCommittee) getSyncSubnets(node.hasSyncPubKey(epoch), syncCommittee)
func getNextSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
let syncCommittee = withState(node.dag.headState):
when stateFork >= BeaconStateFork.Altair:
forkyState.data.next_sync_committee
else:
return static(default(SyncnetBits))
getSyncSubnets(
node.hasSyncPubKey((epoch.sync_committee_period + 1).start_slot().epoch),
syncCommittee)
func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
let
subnets = node.getCurrentSyncCommiteeSubnets(epoch)
epochsToSyncPeriod = nearSyncCommitteePeriod(epoch)
# The end-slot tracker might call this when it's theoretically applicable,
# but more than SYNC_COMMITTEE_SUBNET_COUNT epochs from when the next sync
# committee period begins, in which case `epochsToNextSyncPeriod` is none.
if epochsToSyncPeriod.isNone or
node.dag.cfg.stateForkAtEpoch(epoch + epochsToSyncPeriod.get) <
BeaconStateFork.Altair:
return subnets
subnets + node.getNextSyncCommitteeSubnets(epoch)
proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.addPhase0MessageHandlers(forkDigest, slot) node.addPhase0MessageHandlers(forkDigest, slot)
# If this comes online near sync committee period, it'll immediately get # If this comes online near sync committee period, it'll immediately get
# replaced as usual by trackSyncCommitteeTopics, which runs at slot end. # replaced as usual by trackSyncCommitteeTopics, which runs at slot end.
let currentSyncCommitteeSubnets = node.getCurrentSyncCommiteeSubnets(slot) let
syncnets = node.getSyncCommitteeSubnets(slot.epoch)
for subcommitteeIdx in SyncSubcommitteeIndex: for subcommitteeIdx in SyncSubcommitteeIndex:
if currentSyncCommitteeSubnets[subcommitteeIdx]: if syncnets[subcommitteeIdx]:
node.network.subscribe( node.network.subscribe(
getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams) getSyncCommitteeTopic(forkDigest, subcommitteeIdx), basicParams)
node.network.subscribe( node.network.subscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams) getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) node.network.updateSyncnetsMetadata(syncnets)
proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest) node.removePhase0MessageHandlers(forkDigest)
@ -920,15 +946,22 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe( node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest)) getSyncCommitteeContributionAndProofTopic(forkDigest))
proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) = proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) =
# Unlike trackNextSyncCommitteeTopics, just snap to the currently correct template lastSyncUpdate: untyped =
# set of subscriptions, and use current_sync_committee. Furthermore, this node.consensusManager[].actionTracker.lastSyncUpdate
# is potentially useful at arbitrary times, so don't guard it by checking if lastSyncUpdate == Opt.some(slot.sync_committee_period()) and
# for epoch alignment. nearSyncCommitteePeriod(slot.epoch).isNone():
let currentSyncCommitteeSubnets = node.getCurrentSyncCommiteeSubnets(slot) # No need to update unless we're close to the next sync committee period or
# new validators were registered with the action tracker
# TODO we _could_ skip running this in some of the "near" slots, but..
return
debug "trackCurrentSyncCommitteeTopics: aligning with sync committee subnets", lastSyncUpdate = Opt.some(slot.sync_committee_period())
currentSyncCommitteeSubnets,
let syncnets = node.getSyncCommitteeSubnets(slot.epoch)
debug "Updating sync committee subnets",
syncnets,
metadata_syncnets = node.network.metadata.syncnets, metadata_syncnets = node.network.metadata.syncnets,
gossipState = node.gossipState gossipState = node.gossipState
@ -936,89 +969,28 @@ proc trackCurrentSyncCommitteeTopics(node: BeaconNode, slot: Slot) =
# only remains relevant, currently, for one gossip transition epoch, so the # only remains relevant, currently, for one gossip transition epoch, so the
# consequences of this not being true aren't exceptionally dire, while this # consequences of this not being true aren't exceptionally dire, while this
# allows for bookkeeping simplication. # allows for bookkeeping simplication.
if currentSyncCommitteeSubnets == node.network.metadata.syncnets: if syncnets == node.network.metadata.syncnets:
return return
let let
newSyncSubnets = newSyncnets =
currentSyncCommitteeSubnets - node.network.metadata.syncnets syncnets - node.network.metadata.syncnets
oldSyncSubnets = oldSyncnets =
node.network.metadata.syncnets - currentSyncCommitteeSubnets node.network.metadata.syncnets - syncnets
forkDigests = node.forkDigests() forkDigests = node.forkDigests()
for subcommitteeIdx in SyncSubcommitteeIndex: for subcommitteeIdx in SyncSubcommitteeIndex:
doAssert not (newSyncSubnets[subcommitteeIdx] and doAssert not (newSyncnets[subcommitteeIdx] and
oldSyncSubnets[subcommitteeIdx]) oldSyncnets[subcommitteeIdx])
for gossipFork in node.gossipState: for gossipFork in node.gossipState:
template topic(): auto = template topic(): auto =
getSyncCommitteeTopic(forkDigests[gossipFork], subcommitteeIdx) getSyncCommitteeTopic(forkDigests[gossipFork], subcommitteeIdx)
if oldSyncSubnets[subcommitteeIdx]: if oldSyncnets[subcommitteeIdx]:
node.network.unsubscribe(topic) node.network.unsubscribe(topic)
elif newSyncSubnets[subcommitteeIdx]: elif newSyncnets[subcommitteeIdx]:
node.network.subscribe(topic, basicParams) node.network.subscribe(topic, basicParams)
node.network.updateSyncnetsMetadata(currentSyncCommitteeSubnets) node.network.updateSyncnetsMetadata(syncnets)
func getNextSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits =
let epochsToSyncPeriod = nearSyncCommitteePeriod(epoch)
# The end-slot tracker might call this when it's theoretically applicable,
# but more than SYNC_COMMITTEE_SUBNET_COUNT epochs from when the next sync
# committee period begins, in which case `epochsToNextSyncPeriod` is none.
if epochsToSyncPeriod.isNone or
node.dag.cfg.stateForkAtEpoch(epoch + epochsToSyncPeriod.get) <
BeaconStateFork.Altair:
return static(default(SyncnetBits))
let syncCommittee = withState(node.dag.headState):
when stateFork >= BeaconStateFork.Altair:
forkyState.data.next_sync_committee
else:
return static(default(SyncnetBits))
getSyncSubnets(
node.hasSyncPubKey(epoch + epochsToSyncPeriod.get), syncCommittee)
proc trackNextSyncCommitteeTopics(node: BeaconNode, slot: Slot) =
let
epoch = slot.epoch
epochsToSyncPeriod = nearSyncCommitteePeriod(epoch)
if epochsToSyncPeriod.isNone or
node.dag.cfg.stateForkAtEpoch(epoch + epochsToSyncPeriod.get) <
BeaconStateFork.Altair:
return
# No lookahead required
if epochsToSyncPeriod.get == 0:
node.trackCurrentSyncCommitteeTopics(slot)
return
let nextSyncCommitteeSubnets = node.getNextSyncCommitteeSubnets(epoch)
let forkDigests = node.forkDigests()
var newSubcommittees: SyncnetBits
# https://github.com/ethereum/consensus-specs/blob/v1.2.0/specs/altair/validator.md#sync-committee-subnet-stability
for subcommitteeIdx in SyncSubcommitteeIndex:
if (not node.network.metadata.syncnets[subcommitteeIdx]) and
nextSyncCommitteeSubnets[subcommitteeIdx] and
node.syncCommitteeMsgPool[].isEpochLeadTime(epochsToSyncPeriod.get):
for gossipFork in node.gossipState:
node.network.subscribe(getSyncCommitteeTopic(
forkDigests[gossipFork], subcommitteeIdx), basicParams)
newSubcommittees.setBit(distinctBase(subcommitteeIdx))
debug "trackNextSyncCommitteeTopics: subscribing to sync committee subnets",
metadata_syncnets = node.network.metadata.syncnets,
nextSyncCommitteeSubnets,
gossipState = node.gossipState,
epochsToSyncPeriod = epochsToSyncPeriod.get,
newSubcommittees
node.network.updateSyncnetsMetadata(
node.network.metadata.syncnets + newSubcommittees)
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
## Subscribe to subnets that we are providing stability for or aggregating ## Subscribe to subnets that we are providing stability for or aggregating
@ -1172,7 +1144,6 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.syncCommitteeMsgPool[].pruneData(slot) node.syncCommitteeMsgPool[].pruneData(slot)
if slot.is_epoch: if slot.is_epoch:
node.trackNextSyncCommitteeTopics(slot)
node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch) node.dynamicFeeRecipientsStore[].pruneOldMappings(slot.epoch)
# Update upcoming actions - we do this every slot in case a reorg happens # Update upcoming actions - we do this every slot in case a reorg happens
@ -1208,11 +1179,9 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# int64 conversion is safe # int64 conversion is safe
doAssert slotsToNextSyncCommitteePeriod <= SLOTS_PER_SYNC_COMMITTEE_PERIOD doAssert slotsToNextSyncCommitteePeriod <= SLOTS_PER_SYNC_COMMITTEE_PERIOD
if not node.getCurrentSyncCommiteeSubnets(slot).isZeros: if not node.getCurrentSyncCommiteeSubnets(slot.epoch).isZeros:
"current" "current"
# if 0 => fallback is getCurrentSyncCommitttee so avoid duplicate effort elif not node.getNextSyncCommitteeSubnets(slot.epoch).isZeros:
elif since_sync_committee_period_start(slot) > 0 and
not node.getNextSyncCommitteeSubnets(slot.epoch).isZeros:
"in " & toTimeLeftString( "in " & toTimeLeftString(
SECONDS_PER_SLOT.int64.seconds * slotsToNextSyncCommitteePeriod.int64) SECONDS_PER_SLOT.int64.seconds * slotsToNextSyncCommitteePeriod.int64)
else: else:
@ -1258,6 +1227,8 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# The last thing we do is to perform the subscriptions and unsubscriptions for # The last thing we do is to perform the subscriptions and unsubscriptions for
# the next slot, just before that slot starts - because of the advance cuttoff # the next slot, just before that slot starts - because of the advance cuttoff
# above, this will be done just before the next slot starts # above, this will be done just before the next slot starts
node.updateSyncCommitteeTopics(slot + 1)
await node.updateGossipStatus(slot + 1) await node.updateGossipStatus(slot + 1)
func syncStatus(node: BeaconNode, wallSlot: Slot): string = func syncStatus(node: BeaconNode, wallSlot: Slot): string =

View File

@ -683,8 +683,8 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
getStateField(node.dag.headState, validators).item( getStateField(node.dag.headState, validators).item(
item.validator_index).pubkey item.validator_index).pubkey
node.syncCommitteeMsgPool node.consensusManager[].actionTracker.registerSyncDuty(
.syncCommitteeSubscriptions[validator_pubkey] = item.until_epoch validator_pubkey, item.until_epoch)
node.validatorMonitor[].addAutoMonitor( node.validatorMonitor[].addAutoMonitor(
validator_pubkey, ValidatorIndex(item.validator_index)) validator_pubkey, ValidatorIndex(item.validator_index))

View File

@ -74,6 +74,9 @@ type
## duty, we'll subscribe to the corresponding subnet to collect ## duty, we'll subscribe to the corresponding subnet to collect
## attestations for the aggregate ## attestations for the aggregate
lastSyncUpdate*: Opt[SyncCommitteePeriod]
syncDuties*: Table[ValidatorPubKey, Epoch]
func hash*(x: AggregatorDuty): Hash = func hash*(x: AggregatorDuty): Hash =
hashAllFields(x) hashAllFields(x)
@ -106,6 +109,26 @@ proc registerDuty*(
debug "Registering aggregation duty", slot, subnet_id, vidx debug "Registering aggregation duty", slot, subnet_id, vidx
tracker.duties.incl(newDuty) tracker.duties.incl(newDuty)
proc registerSyncDuty*(
tracker: var ActionTracker, pubkey: ValidatorPubKey, until_epoch: Epoch) =
if tracker.currentSlot.epoch >= until_epoch:
return
tracker.syncDuties.withValue(pubkey, entry) do:
if entry[] < until_epoch:
debug "Updating sync duty",
pubkey = shortLog(pubkey), prev_until_epoch = entry[], until_epoch
entry[] = until_epoch
reset(tracker.lastSyncUpdate)
do:
debug "Registering sync duty", pubkey = shortLog(pubkey), until_epoch
tracker.syncDuties[pubkey] = until_epoch
reset(tracker.lastSyncUpdate)
proc hasSyncDuty*(
tracker: ActionTracker, pubkey: ValidatorPubKey, epoch: Epoch): bool =
epoch < tracker.syncDuties.getOrDefault(pubkey, GENESIS_EPOCH)
const allSubnetBits = block: const allSubnetBits = block:
var res: AttnetBits var res: AttnetBits
for i in 0..<res.len: res[i] = true for i in 0..<res.len: res[i] = true
@ -136,6 +159,14 @@ proc updateSlot*(tracker: var ActionTracker, wallSlot: Slot) =
# duties at the same time # duties at the same time
tracker.duties.keepItIf(it.slot >= wallSlot) tracker.duties.keepItIf(it.slot >= wallSlot)
block:
var dels: seq[ValidatorPubKey]
for k, v in tracker.syncDuties:
if wallSlot.epoch >= v:
dels.add k
for k in dels:
tracker.syncDuties.del(k)
# Keep stability subnets for as long as validators are validating # Keep stability subnets for as long as validators are validating
var toPrune: seq[ValidatorIndex] var toPrune: seq[ValidatorIndex]
for k, v in tracker.knownValidators: for k, v in tracker.knownValidators:

View File

@ -53,3 +53,39 @@ suite "subnet tracker":
check: check:
tracker.stabilitySubnets(Slot(0)).countOnes() == 0 tracker.stabilitySubnets(Slot(0)).countOnes() == 0
tracker.aggregateSubnets(Slot(0)).countOnes() == 0 tracker.aggregateSubnets(Slot(0)).countOnes() == 0
test "should register sync committee duties":
var
tracker = ActionTracker.init(rng, false)
pk0 = ValidatorPubKey.fromHex("0xb4102a1f6c80e5c596a974ebd930c9f809c3587dc4d1d3634b77ff66db71e376dbc86c3252c6d140ce031f4ec6167798").get()
pk1 = ValidatorPubKey.fromHex("0xa00d2954717425ce047e0928e5f4ec7c0e3bbe1058db511303fd659770ddace686ee2e22ac180422e516f4c503eb2228").get()
check:
not tracker.hasSyncDuty(pk0, Epoch(1024))
tracker.lastSyncUpdate = Opt.some(SyncCommitteePeriod(42))
tracker.registerSyncDuty(pk0, Epoch(1024))
check:
tracker.lastSyncUpdate.isNone()
not tracker.hasSyncDuty(pk0, Epoch(1024))
not tracker.hasSyncDuty(pk1, Epoch(1023))
tracker.hasSyncDuty(pk0, Epoch(1023))
tracker.registerSyncDuty(pk0, Epoch(1022))
check: # Should not overwrite longer duties
tracker.hasSyncDuty(pk0, Epoch(1023))
tracker.registerSyncDuty(pk0, Epoch(1025))
check: # Should update existing duties
tracker.hasSyncDuty(pk0, Epoch(1024))
tracker.updateSlot(Epoch(1025).start_slot)
check: # should prune old duties on updateSlot
not tracker.hasSyncDuty(pk0, Epoch(1024))
tracker.registerSyncDuty(pk0, Epoch(1025))
check: # should not add old duties
not tracker.hasSyncDuty(pk0, Epoch(1024))