diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 5f5f7435b..b65cba86f 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -359,11 +359,12 @@ proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) await allFutures(attestationSubscriptions) -proc updateStabilitySubnetMetadata(node: BeaconNode, stabilitySubnet: uint64) = +proc updateStabilitySubnetMetadata( + node: BeaconNode, stabilitySubnets: set[uint8]) = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata node.network.metadata.seq_number += 1 for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - node.network.metadata.attnets[subnet] = (subnet == stabilitySubnet) + node.network.metadata.attnets[subnet] = (subnet in stabilitySubnets) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestation-subnet-bitfield @@ -374,12 +375,38 @@ proc updateStabilitySubnetMetadata(node: BeaconNode, stabilitySubnet: uint64) = # be the correct one and the ENR will not increase in size. warn "Failed to update record on subnet cycle", error = res.error else: - debug "Stability subnet changed, updated ENR attnets", stabilitySubnet + debug "Stability subnets changed; updated ENR attnets", stabilitySubnets -proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} = +func getStabilitySubnets(stabilitySubnets: auto): set[uint8] = + for subnetInfo in stabilitySubnets: + result.incl subnetInfo.subnet + +proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 - let epochParity = slot.epoch mod 2 + # Only know RANDAO mix, which determines shuffling seed, one epoch in + # advance. When node.chainDag.headState.data.data.slot.epoch is ahead + # of wallSlot, the clock's just incorrect. If the state slot's behind + # wallSlot, it would have to look more than MIN_SEED_LOOKAHEAD epochs + # ahead to compute the shuffling determining the beacon committees. + if node.chainDag.headState.data.data.slot.epoch != wallSlot.epoch: + debug "Requested attestation subnets too far in advance", + wallSlot, + stateSlot = node.chainDag.headState.data.data.slot + return + + if node.attestationSubnets.nextCycleEpoch > wallSlot.epoch: + return + node.attestationSubnets.nextCycleEpoch = wallSlot.epoch + 1 + + # This works so long as at least one block in an epoch provides a basis for + # calculating the shuffling for the next epoch. It will keep checking for a + # block, each slot, until a block comes in, even if the first few blocks in + # an epoch are missing. If a whole epoch without blocks occurs, it's not as + # important to attest regardless, as those upcoming blocks will hit maximum + # attestations quickly and any individual attestation's likelihood of being + # selected is low. + let epochParity = wallSlot.epoch mod 2 var attachedValidators: seq[ValidatorIndex] for validatorIndex in 0 ..< node.chainDag.headState.data.data.validators.len: if node.getAttachedValidator( @@ -392,9 +419,10 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} = let (newAttestationSubnets, expiringSubnets, newSubnets) = get_attestation_subnet_changes( node.chainDag.headState.data.data, attachedValidators, - node.attestationSubnets, slot.epoch) + node.attestationSubnets) - let prevStabilitySubnet = node.attestationSubnets.stabilitySubnet + let prevStabilitySubnets = + getStabilitySubnets(node.attestationSubnets.stabilitySubnets) node.attestationSubnets = newAttestationSubnets debug "Attestation subnets", @@ -403,9 +431,8 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} = node.attestationSubnets.subscribedSubnets[1 - epochParity], upcoming_subnets = node.attestationSubnets.subscribedSubnets[epochParity], new_subnets = newSubnets, - stability_subnet = node.attestationSubnets.stabilitySubnet, - stability_subnet_expiration_epoch = - node.attestationSubnets.stabilitySubnetExpirationEpoch + epoch = wallSlot.epoch, + num_stability_subnets = node.attestationSubnets.stabilitySubnets.len block: var unsubscriptions: seq[Future[void]] = @[] @@ -417,9 +444,10 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} = await node.installAttestationSubnetHandlers(newSubnets) - let stabilitySubnet = node.attestationSubnets.stabilitySubnet - if stabilitySubnet != prevStabilitySubnet: - node.updateStabilitySubnetMetadata(stabilitySubnet) + let stabilitySubnets = + getStabilitySubnets(node.attestationSubnets.stabilitySubnets) + if stabilitySubnets != prevStabilitySubnets: + node.updateStabilitySubnetMetadata(stabilitySubnets) proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] = var initialSubnets: set[uint8] @@ -431,18 +459,28 @@ proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] = # We might want to reuse the previous stability subnet if not expired when: # - Restarting the node with a presistent netkey # - When going from synced -> syncing -> synced state - let wallEpoch = node.beaconClock.now().slotOrZero().epoch - node.attestationSubnets.stabilitySubnet = rand(ATTESTATION_SUBNET_COUNT - 1).uint64 - node.attestationSubnets.stabilitySubnetExpirationEpoch = - wallEpoch + getStabilitySubnetLength() + let wallEpoch = node.beaconClock.now().slotOrZero().epoch + node.attestationSubnets.stabilitySubnets.setLen( + node.attachedValidators.count) + for i in 0 ..< node.attachedValidators.count: + node.attestationSubnets.stabilitySubnets[i] = ( + subnet: rand(ATTESTATION_SUBNET_COUNT - 1).uint8, + expiration: wallEpoch + getStabilitySubnetLength()) - node.updateStabilitySubnetMetadata(node.attestationSubnets.stabilitySubnet) + node.updateStabilitySubnetMetadata( + node.attestationSubnets.stabilitySubnets.getStabilitySubnets) # Sets the "current" and "future" attestation subnets. One of these gets - # replaced by get_attestation_subnet_changes() immediately. + # replaced by get_attestation_subnet_changes() immediately. Symmetric so + # that it's robust to the exact timing of when cycleAttestationSubnets() + # first runs, by making that first (effective) swap a no-op. node.attestationSubnets.subscribedSubnets[0] = initialSubnets node.attestationSubnets.subscribedSubnets[1] = initialSubnets + node.attestationSubnets.enabled = true + debug "Initial attestation subnets subscribed", + initialSubnets, + wallEpoch node.installAttestationSubnetHandlers(initialSubnets) proc addMessageHandlers(node: BeaconNode): Future[void] = @@ -457,12 +495,12 @@ proc addMessageHandlers(node: BeaconNode): Future[void] = ) func getTopicSubscriptionEnabled(node: BeaconNode): bool = - node.attestationSubnets.subscribedSubnets[0].len + - node.attestationSubnets.subscribedSubnets[1].len > 0 + node.attestationSubnets.enabled proc removeMessageHandlers(node: BeaconNode): Future[void] = node.attestationSubnets.subscribedSubnets[0] = {} node.attestationSubnets.subscribedSubnets[1] = {} + node.attestationSubnets.enabled = false doAssert not node.getTopicSubscriptionEnabled() var unsubscriptions = mapIt( @@ -526,8 +564,9 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = syncQueueLen await node.removeMessageHandlers() - # Subscription or unsubscription might have occurred; recheck - if slot.isEpoch and node.getTopicSubscriptionEnabled: + # Subscription or unsubscription might have occurred; recheck. + if node.getTopicSubscriptionEnabled: + # This exits early all but one call each epoch. await node.cycleAttestationSubnets(slot) proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] = diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index 296a0e44b..bccbe00f7 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -87,7 +87,6 @@ proc getValidatorDutiesForEpoch(vc: ValidatorClient, epoch: Epoch) {.gcsafe, asy vc.attestationsForEpoch.clear() await getAttesterDutiesForEpoch(epoch) # obtain the attestation duties this VC should do during the next epoch - # TODO currently we aren't making use of this but perhaps we should await getAttesterDutiesForEpoch(epoch + 1) # for now we will get the fork each time we update the validator duties for each epoch @@ -129,6 +128,9 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a # 1 slot earlier because there are a few back-and-forth requests which # could take up time for attesting... Perhaps this should be called more # than once per epoch because of forks & other events... + # + # calling it before epoch n starts means one can't ensure knowing about + # epoch n+1. if slot.isEpoch: await getValidatorDutiesForEpoch(vc, epoch) @@ -264,6 +266,16 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a # need similar amounts of memory. GC_fullCollect() + if (slot - 2).isEpoch and (slot.epoch + 1) in vc.attestationsForEpoch: + for slot, attesterDuties in vc.attestationsForEpoch[slot.epoch + 1].pairs: + for ad in attesterDuties: + let + validator = vc.attachedValidators.validators[ad.public_key] + sig = await validator.getSlotSig( + vc.fork, vc.beaconGenesis.genesis_validators_root, slot) + discard await vc.client.post_v1_validator_beacon_committee_subscriptions( + ad.committee_index, ad.slot, true, ad.public_key, sig) + addTimer(nextSlotStart) do (p: pointer): asyncCheck vc.onSlotStart(slot, nextSlot) diff --git a/beacon_chain/rpc/validator_api.nim b/beacon_chain/rpc/validator_api.nim index fba45e776..762b80cb4 100644 --- a/beacon_chain/rpc/validator_api.nim +++ b/beacon_chain/rpc/validator_api.nim @@ -15,7 +15,7 @@ import chronicles, # Local modules - ../spec/[datatypes, digest, crypto, helpers], + ../spec/[datatypes, digest, crypto, helpers, network, signatures], ../spec/eth2_apis/callsigs_types, ../block_pools/[chain_dag, spec_cache], ../ssz/merkleization, ../beacon_node_common, ../beacon_node_types, ../attestation_pool, @@ -115,5 +115,39 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = rpcServer.rpc("post_v1_validator_beacon_committee_subscriptions") do ( committee_index: CommitteeIndex, slot: Slot, aggregator: bool, validator_pubkey: ValidatorPubKey, slot_signature: ValidatorSig) -> bool: - debug "post_v1_validator_beacon_committee_subscriptions" - raise newException(CatchableError, "Not implemented") + debug "post_v1_validator_beacon_committee_subscriptions", + committee_index, slot + if committee_index.uint64 >= ATTESTATION_SUBNET_COUNT.uint64: + raise newException(CatchableError, + "Invalid committee index") + + if node.syncManager.inProgress: + raise newException(CatchableError, + "Beacon node is currently syncing and not serving request on that endpoint") + + let wallSlot = node.beaconClock.now.slotOrZero + if wallSlot > slot + 1: + raise newException(CatchableError, + "Past slot requested") + + let epoch = slot.epoch + if epoch >= wallSlot.epoch and epoch - wallSlot.epoch > 1: + raise newException(CatchableError, + "Slot requested not in current or next wall-slot epoch") + + if not verify_slot_signature( + node.chainDag.headState.data.data.fork, + node.chainDag.headState.data.data.genesis_validators_root, + slot, validator_pubkey, slot_signature): + raise newException(CatchableError, + "Invalid slot signature") + + let subnet = committee_index.uint8 + if subnet notin node.attestationSubnets.subscribedSubnets[0] and + subnet notin node.attestationSubnets.subscribedSubnets[1]: + await node.network.subscribe(getAttestationTopic( + node.forkDigest, subnet)) + + # But it might only be in current + node.attestationSubnets.subscribedSubnets[0].incl subnet + node.attestationSubnets.subscribedSubnets[1].incl subnet diff --git a/beacon_chain/spec/datatypes.nim b/beacon_chain/spec/datatypes.nim index 766de946e..c76ff7dae 100644 --- a/beacon_chain/spec/datatypes.nim +++ b/beacon_chain/spec/datatypes.nim @@ -457,9 +457,10 @@ type beacon_proposer_indices*: Table[Slot, Option[ValidatorIndex]] AttestationSubnets* = object + enabled*: bool + nextCycleEpoch*: Epoch subscribedSubnets*: array[2, set[uint8]] - stabilitySubnet*: uint64 - stabilitySubnetExpirationEpoch*: Epoch + stabilitySubnets*: seq[tuple[subnet: uint8, expiration: Epoch]] # This matches the mutable state of the Solidity deposit contract # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/solidity_deposit_contract/deposit_contract.sol diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 64d6404b3..cad530e30 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -124,42 +124,54 @@ proc getStabilitySubnetLength*(): uint64 = proc get_attestation_subnet_changes*( state: BeaconState, attachedValidators: openArray[ValidatorIndex], - prevAttestationSubnets: AttestationSubnets, epoch: Epoch): + prevAttestationSubnets: AttestationSubnets): tuple[a: AttestationSubnets, b: set[uint8], c: set[uint8]] = static: doAssert ATTESTATION_SUBNET_COUNT == 64 # Fits in a set[uint8] doAssert attachedValidators.len > 0 - var attestationSubnets = prevAttestationSubnets + # Guaranteed equivalent to wallSlot by cycleAttestationSubnets(), especially + # since it'll try to run early in epochs, avoiding race conditions. + let epoch = state.slot.epoch # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability - let prevStabilitySubnet = {attestationSubnets.stabilitySubnet.uint8} - if epoch >= attestationSubnets.stabilitySubnetExpirationEpoch: - attestationSubnets.stabilitySubnet = - rand(ATTESTATION_SUBNET_COUNT - 1).uint64 - attestationSubnets.stabilitySubnetExpirationEpoch = - epoch + getStabilitySubnetLength() + var + attestationSubnets = prevAttestationSubnets + prevStabilitySubnets: set[uint8] = {} + stabilitySet: set[uint8] = {} + for i in 0 ..< attestationSubnets.stabilitySubnets.len: + static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8) + prevStabilitySubnets.incl attestationSubnets.stabilitySubnets[i].subnet + + if epoch >= attestationSubnets.stabilitySubnets[i].expiration: + attestationSubnets.stabilitySubnets[i].subnet = + rand(ATTESTATION_SUBNET_COUNT - 1).uint8 + attestationSubnets.stabilitySubnets[i].expiration = + epoch + getStabilitySubnetLength() + + stabilitySet.incl attestationSubnets.stabilitySubnets[i].subnet var nextEpochSubnets: set[uint8] for it in get_committee_assignments( - state, state.slot.epoch + 1, attachedValidators.toHashSet): + state, epoch + 1, attachedValidators.toHashSet): nextEpochSubnets.incl it.subnetIndex.uint8 doAssert nextEpochSubnets.len <= attachedValidators.len + nextEpochSubnets.incl stabilitySet let epochParity = epoch mod 2 - stabilitySet = {attestationSubnets.stabilitySubnet.uint8} currentEpochSubnets = attestationSubnets.subscribedSubnets[1 - epochParity] expiringSubnets = - (prevStabilitySubnet + + (prevStabilitySubnets + attestationSubnets.subscribedSubnets[epochParity]) - nextEpochSubnets - currentEpochSubnets - stabilitySet newSubnets = - (nextEpochSubnets + stabilitySet) - - (currentEpochSubnets + prevStabilitySubnet) + nextEpochSubnets - (currentEpochSubnets + prevStabilitySubnets) doAssert newSubnets.len <= attachedValidators.len + 1 + doAssert (expiringSubnets * currentEpochSubnets).len == 0 + doAssert newSubnets <= nextEpochSubnets - attestationSubnets.subscribedSubnets[epochParity] = newSubnets + attestationSubnets.subscribedSubnets[epochParity] = nextEpochSubnets (attestationSubnets, expiringSubnets, newSubnets)