From 2afe2802b6005edc62710ec5b2b2fec4beb503da Mon Sep 17 00:00:00 2001 From: tersec Date: Mon, 9 Aug 2021 12:54:45 +0000 Subject: [PATCH] altair topic switching (#2767) * altair topic switching * remove validate{Committee,Validator}IndexOr unused within branch --- beacon_chain/beacon_node_common.nim | 7 + .../consensus_object_pools/blockchain_dag.nim | 10 +- beacon_chain/networking/eth2_network.nim | 17 +- beacon_chain/nimbus_beacon_node.nim | 336 ++++++++++++------ beacon_chain/spec/beaconstate.nim | 12 + .../spec/forkedbeaconstate_helpers.nim | 18 + beacon_chain/sync/sync_protocol.nim | 2 +- 7 files changed, 280 insertions(+), 122 deletions(-) diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 93cb394a6..f4a9bbf4f 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -32,6 +32,12 @@ export type RpcServer* = RpcHttpServer + GossipState* = enum + Disconnected + ConnectedToPhase0 + InTransitionToAltair + ConnectedToAltair + BeaconNode* = ref object nickname*: string graffitiBytes*: GraffitiBytes @@ -56,6 +62,7 @@ type blockProcessor*: ref BlockProcessor consensusManager*: ref ConsensusManager attachedValidatorBalanceTotal*: uint64 + gossipState*: GossipState const MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 545f203a1..93b474f5a 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -454,6 +454,9 @@ proc init*(T: type ChainDAGRef, dag +template genesisValidatorsRoot*(dag: ChainDAGRef): Eth2Digest = + getStateField(dag.headState.data, genesis_validators_root) + func getEpochRef*( dag: ChainDAGRef, state: StateData, cache: var StateCache): EpochRef = let @@ -544,8 +547,11 @@ func stateCheckpoint*(bs: BlockSlot): BlockSlot = bs = bs.parentOrSlot bs -proc forkDigestAtSlot*(dag: ChainDAGRef, slot: Slot): ForkDigest = - if slot.epoch < dag.cfg.ALTAIR_FORK_EPOCH: +template forkAtEpoch*(dag: ChainDAGRef, epoch: Epoch): Fork = + forkAtEpoch(dag.cfg, epoch) + +proc forkDigestAtEpoch*(dag: ChainDAGRef, epoch: Epoch): ForkDigest = + if epoch < dag.cfg.ALTAIR_FORK_EPOCH: dag.forkDigests.phase0 else: dag.forkDigests.altair diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 1aad2ce2c..dbe2fb050 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1825,27 +1825,26 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) = except IOError as exc: raiseAssert exc.msg # TODO in-memory compression shouldn't fail -proc subscribeAttestationSubnets*( - node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {. - raises: [Defect, CatchableError].} = +proc subscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT], + forkDigest: ForkDigest) + {.raises: [Defect, CatchableError].} = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable for subnet_id, enabled in subnets: if enabled: node.subscribe(getAttestationTopic( - node.forkID.fork_digest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now + forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now -proc unsubscribeAttestationSubnets*( - node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {. - raises: [Defect, CatchableError].} = +proc unsubscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT], + forkDigest: ForkDigest) + {.raises: [Defect, CatchableError].} = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable for subnet_id, enabled in subnets: if enabled: - node.unsubscribe(getAttestationTopic( - node.forkID.fork_digest, SubnetId(subnet_id))) + node.unsubscribe(getAttestationTopic(forkDigest, SubnetId(subnet_id))) proc updateStabilitySubnetMetadata*( node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) = diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 3778500fb..ed2b6eb36 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -35,10 +35,11 @@ import slashing_protection, keystore_management], ./sync/[sync_manager, sync_protocol, request_manager], ./rpc/[rest_api, rpc_api], + ./spec/datatypes/[altair, phase0], ./spec/[ - datatypes/phase0, datatypes/altair, digest, crypto, - forkedbeaconstate_helpers, beaconstate, eth2_apis/rpc_beacon_client, - helpers, network, presets, weak_subjectivity, signatures], + digest, crypto, forkedbeaconstate_helpers, beaconstate, + eth2_apis/rpc_beacon_client, helpers, network, presets, + weak_subjectivity, signatures], ./consensus_object_pools/[ blockchain_dag, block_quarantine, block_clearance, block_pools_types, attestation_pool, exit_pool, spec_cache], @@ -352,6 +353,7 @@ proc init*(T: type BeaconNode, db: db, config: config, dag: dag, + gossipState: GossipState.Disconnected, quarantine: quarantine, attestationPool: attestationPool, attachedValidators: validatorPool, @@ -368,17 +370,23 @@ proc init*(T: type BeaconNode, # set topic validation routine network.setValidTopics( block: - # TODO altair-transition var topics = @[ getBeaconBlocksTopic(network.forkDigests.phase0), getAttesterSlashingsTopic(network.forkDigests.phase0), getProposerSlashingsTopic(network.forkDigests.phase0), getVoluntaryExitsTopic(network.forkDigests.phase0), - getAggregateAndProofsTopic(network.forkDigests.phase0) + getAggregateAndProofsTopic(network.forkDigests.phase0), + + getBeaconBlocksTopic(dag.forkDigests.altair), + getAttesterSlashingsTopic(network.forkDigests.altair), + getProposerSlashingsTopic(network.forkDigests.altair), + getVoluntaryExitsTopic(network.forkDigests.altair), + getAggregateAndProofsTopic(network.forkDigests.altair) ] for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: topics &= getAttestationTopic(network.forkDigests.phase0, SubnetId(subnet_id)) + topics &= getAttestationTopic(network.forkDigests.altair, SubnetId(subnet_id)) topics) if node.config.inProcessValidators: @@ -392,9 +400,6 @@ proc init*(T: type BeaconNode, except Exception as exc: raiseAssert exc.msg node.addRemoteValidators() - # This merely configures the BeaconSync - # The traffic will be started when we join the network. - # TODO altair-transition network.initBeaconSync(dag, getTime) node.updateValidatorMetrics() @@ -604,8 +609,20 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = unsubscribeSubnets = prevAllSubnets - allSubnets subscribeSubnets = allSubnets - prevAllSubnets - node.network.unsubscribeAttestationSubnets(unsubscribeSubnets) - node.network.subscribeAttestationSubnets(subscribeSubnets) + case node.gossipState + of GossipState.Disconnected: + discard + of GossipState.ConnectedToPhase0: + node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0) + node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0) + of GossipState.InTransitionToAltair: + node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0) + node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair) + node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0) + node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair) + of GossipState.ConnectedToAltair: + node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair) + node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair) debug "Attestation subnets", wallSlot, @@ -642,7 +659,8 @@ proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] = mergeAggregateSubnets(wallEpoch) mergeAggregateSubnets(wallEpoch + 1) -proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. +proc subscribeAttestationSubnetHandlers(node: BeaconNode, + forkDigest: ForkDigest) {. raises: [Defect, CatchableError].} = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability # TODO: @@ -690,84 +708,102 @@ proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets), stabilitySubnets = subnetLog(stabilitySubnets) node.network.subscribeAttestationSubnets( - node.attestationSubnets.aggregateSubnets + stabilitySubnets) + node.attestationSubnets.aggregateSubnets + stabilitySubnets, + forkDigest) -proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = - # inspired by lighthouse research here - # https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py - const - blocksTopicParams = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 1.1471603557060206, - firstMessageDeliveriesDecay: 0.9928302477768374, - firstMessageDeliveriesCap: 34.86870846001471, - meshMessageDeliveriesWeight: -458.31054878249114, - meshMessageDeliveriesDecay: 0.9716279515771061, - meshMessageDeliveriesThreshold: 0.6849191409056553, - meshMessageDeliveriesCap: 2.054757422716966, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -458.31054878249114 , - meshFailurePenaltyDecay: 0.9716279515771061, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - aggregateTopicParams = TopicParams( - topicWeight: 0.5, - timeInMeshWeight: 0.03333333333333333, - timeInMeshQuantum: chronos.seconds(12), - timeInMeshCap: 300, - firstMessageDeliveriesWeight: 0.10764904539552399, - firstMessageDeliveriesDecay: 0.8659643233600653, - firstMessageDeliveriesCap: 371.5778421725158, - meshMessageDeliveriesWeight: -0.07538533073670682, - meshMessageDeliveriesDecay: 0.930572040929699, - meshMessageDeliveriesThreshold: 53.404248450179836, - meshMessageDeliveriesCap: 213.61699380071934, - meshMessageDeliveriesActivation: chronos.seconds(384), - meshMessageDeliveriesWindow: chronos.seconds(2), - meshFailurePenaltyWeight: -0.07538533073670682 , - meshFailurePenaltyDecay: 0.930572040929699, - invalidMessageDeliveriesWeight: -214.99999999999994, - invalidMessageDeliveriesDecay: 0.9971259067705325 - ) - basicParams = TopicParams.init() +# inspired by lighthouse research here +# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py +const + blocksTopicParams = TopicParams( + topicWeight: 0.5, + timeInMeshWeight: 0.03333333333333333, + timeInMeshQuantum: chronos.seconds(12), + timeInMeshCap: 300, + firstMessageDeliveriesWeight: 1.1471603557060206, + firstMessageDeliveriesDecay: 0.9928302477768374, + firstMessageDeliveriesCap: 34.86870846001471, + meshMessageDeliveriesWeight: -458.31054878249114, + meshMessageDeliveriesDecay: 0.9716279515771061, + meshMessageDeliveriesThreshold: 0.6849191409056553, + meshMessageDeliveriesCap: 2.054757422716966, + meshMessageDeliveriesActivation: chronos.seconds(384), + meshMessageDeliveriesWindow: chronos.seconds(2), + meshFailurePenaltyWeight: -458.31054878249114 , + meshFailurePenaltyDecay: 0.9716279515771061, + invalidMessageDeliveriesWeight: -214.99999999999994, + invalidMessageDeliveriesDecay: 0.9971259067705325 + ) + aggregateTopicParams = TopicParams( + topicWeight: 0.5, + timeInMeshWeight: 0.03333333333333333, + timeInMeshQuantum: chronos.seconds(12), + timeInMeshCap: 300, + firstMessageDeliveriesWeight: 0.10764904539552399, + firstMessageDeliveriesDecay: 0.8659643233600653, + firstMessageDeliveriesCap: 371.5778421725158, + meshMessageDeliveriesWeight: -0.07538533073670682, + meshMessageDeliveriesDecay: 0.930572040929699, + meshMessageDeliveriesThreshold: 53.404248450179836, + meshMessageDeliveriesCap: 213.61699380071934, + meshMessageDeliveriesActivation: chronos.seconds(384), + meshMessageDeliveriesWindow: chronos.seconds(2), + meshFailurePenaltyWeight: -0.07538533073670682 , + meshFailurePenaltyDecay: 0.930572040929699, + invalidMessageDeliveriesWeight: -214.99999999999994, + invalidMessageDeliveriesDecay: 0.9971259067705325 + ) + basicParams = TopicParams.init() - static: - # compile time validation - blocksTopicParams.validateParameters().tryGet() - aggregateTopicParams.validateParameters().tryGet() - basicParams.validateParameters.tryGet() +static: + # compile time validation + blocksTopicParams.validateParameters().tryGet() + aggregateTopicParams.validateParameters().tryGet() + basicParams.validateParameters.tryGet() - # TODO altair-transition - node.network.subscribe( - getBeaconBlocksTopic(node.dag.forkDigests.phase0), blocksTopicParams, enableTopicMetrics = true) - node.network.subscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0), basicParams) - node.network.subscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0), basicParams) - node.network.subscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0), basicParams) - node.network.subscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0), aggregateTopicParams, enableTopicMetrics = true) - node.subscribeAttestationSubnetHandlers() +proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) + {.raises: [Defect, CatchableError].} = + node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true) + node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams) + node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams) + node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams) + node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true) + + node.subscribeAttestationSubnetHandlers(forkDigest) + +proc addPhase0MessageHandlers(node: BeaconNode) + {.raises: [Defect, CatchableError].} = + addPhase0MessageHandlers(node, node.dag.forkDigests.phase0) + +proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) + {.raises: [Defect, CatchableError].} = + node.network.unsubscribe(getBeaconBlocksTopic(forkDigest)) + node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest)) + node.network.unsubscribe(getProposerSlashingsTopic(forkDigest)) + node.network.unsubscribe(getAttesterSlashingsTopic(forkDigest)) + node.network.unsubscribe(getAggregateAndProofsTopic(forkDigest)) + + for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: + node.network.unsubscribe( + getAttestationTopic(forkDigest, SubnetId(subnet_id))) + +proc removePhase0MessageHandlers(node: BeaconNode) + {.raises: [Defect, CatchableError].} = + removePhase0MessageHandlers(node, node.dag.forkDigests.phase0) + +proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) + {.raises: [Defect, CatchableError].} = + node.addPhase0MessageHandlers(node.dag.forkDigests.altair) + +proc removeAltairMessageHandlers(node: BeaconNode) + {.raises: [Defect, CatchableError].} = + node.removePhase0MessageHandlers(node.dag.forkDigests.altair) func getTopicSubscriptionEnabled(node: BeaconNode): bool = node.attestationSubnets.enabled -proc removeMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = - node.attestationSubnets.enabled = false - doAssert not node.getTopicSubscriptionEnabled() - - # TODO altair-transition - node.network.unsubscribe(getBeaconBlocksTopic(node.dag.forkDigests.phase0)) - node.network.unsubscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0)) - node.network.unsubscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0)) - node.network.unsubscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0)) - node.network.unsubscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0)) - - for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: - node.network.unsubscribe( - getAttestationTopic(node.dag.forkDigests.phase0, SubnetId(subnet_id))) +proc removeAllMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = + node.removePhase0MessageHandlers() + node.removeAltairMessageHandlers() proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) = # When another client's already running, this is very likely to detect @@ -796,12 +832,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, Catchab let syncQueueLen = node.syncManager.syncQueueLen - topicSubscriptionEnabled = node.getTopicSubscriptionEnabled() - if - # Don't enable if already enabled; to avoid race conditions requires care, - # but isn't crucial, as this condition spuriously fail, but the next time, - # should properly succeed. - not topicSubscriptionEnabled and + targetGossipState = # SyncManager forward sync by default runs until maxHeadAge slots, or one # epoch range is achieved. This particular condition has a couple caveats # including that under certain conditions, debtsCount appears to push len @@ -811,28 +842,80 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, Catchab # are left. Furthermore, even when 0 peers are being used, this won't get # to 0 slots in syncQueueLen, but that's a vacuous condition given that a # networking interaction cannot happen under such circumstances. - syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS: - # When node.cycleAttestationSubnets() is enabled more properly, integrate - # this into the node.cycleAttestationSubnets() call. + if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS: + GossipState.Disconnected + elif slot.epoch + 1 < node.dag.cfg.ALTAIR_FORK_EPOCH: + GossipState.ConnectedToPhase0 + elif slot.epoch >= node.dag.cfg.ALTAIR_FORK_EPOCH: + GossipState.ConnectedToAltair + else: + GossipState.InTransitionToAltair + + if node.gossipState == GossipState.Disconnected and + targetGossipState != GossipState.Disconnected: + # We are synced, so we will connect debug "Enabling topic subscriptions", wallSlot = slot, headSlot = node.dag.head.slot, syncQueueLen node.setupDoppelgangerDetection(slot) - node.addMessageHandlers() - doAssert node.getTopicSubscriptionEnabled() - elif - topicSubscriptionEnabled and - syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and - # Filter out underflow from debtsCount; plausible queue lengths can't - # exceed wallslot, with safety margin. - syncQueueLen < 2 * slot.uint64: - debug "Disabling topic subscriptions", - wallSlot = slot, - headSlot = node.dag.head.slot, - syncQueueLen - node.removeMessageHandlers() + + block addRemoveHandlers: + case targetGossipState + of GossipState.Disconnected: + case node.gossipState: + of GossipState.Disconnected: break + else: + if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and + # Filter out underflow from debtsCount; plausible queue lengths can't + # exceed wallslot, with safety margin. + syncQueueLen < 2 * slot.uint64: + debug "Disabling topic subscriptions", + wallSlot = slot, + headSlot = node.dag.head.slot, + syncQueueLen + node.removeAllMessageHandlers() + node.gossipState = GossipState.Disconnected + break + + of GossipState.ConnectedToPhase0: + case node.gossipState: + of GossipState.ConnectedToPhase0: break + of GossipState.Disconnected: + node.addPhase0MessageHandlers() + of GossipState.InTransitionToAltair: + warn "Unexpected clock regression during altair transition" + node.removeAltairMessageHandlers() + of GossipState.ConnectedToAltair: + warn "Unexpected clock regression during altair transition" + node.removeAltairMessageHandlers() + node.addPhase0MessageHandlers() + + of GossipState.InTransitionToAltair: + case node.gossipState: + of GossipState.InTransitionToAltair: break + of GossipState.Disconnected: + node.addPhase0MessageHandlers() + node.addAltairMessageHandlers(slot) + of GossipState.ConnectedToPhase0: + node.addAltairMessageHandlers(slot) + of GossipState.ConnectedToAltair: + warn "Unexpected clock regression during altair transition" + node.addPhase0MessageHandlers() + + of GossipState.ConnectedToAltair: + case node.gossipState: + of GossipState.ConnectedToAltair: break + of GossipState.Disconnected: + node.addAltairMessageHandlers(slot) + of GossipState.ConnectedToPhase0: + node.removePhase0MessageHandlers() + node.addAltairMessageHandlers(slot) + of GossipState.InTransitionToAltair: + node.removePhase0MessageHandlers() + + node.gossipState = targetGossipState # Subscription or unsubscription might have occurred; recheck. Since Nimbus # initially subscribes to all subnets, simply do not ever cycle attestation @@ -1047,7 +1130,8 @@ proc startSyncManager(node: BeaconNode) = true proc onDeletePeer(peer: Peer) = - if peer.connectionState notin {Disconnecting, ConnectionState.Disconnected}: + if peer.connectionState notin {ConnectionState.Disconnecting, + ConnectionState.Disconnected}: if peer.score < PeerScoreLowLimit: debug "Peer was removed from PeerPool due to low score", peer = peer, peer_score = peer.score, score_low_limit = PeerScoreLowLimit, @@ -1098,7 +1182,7 @@ proc installMessageValidators(node: BeaconNode) = # These validators stay around the whole time, regardless of which specific # subnets are subscribed to during any given epoch. - # TODO altair-transition + # TODO altair-transition, well, without massive copy/pasting (extract to template or etc) for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: closureScope: let subnet_id = SubnetId(it) @@ -1133,6 +1217,41 @@ proc installMessageValidators(node: BeaconNode) = proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult = node.processor[].voluntaryExitValidator(signedVoluntaryExit)) + # TODO copy/paste starts here; templatize whole thing + for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: + closureScope: + let subnet_id = SubnetId(it) + node.network.addAsyncValidator( + getAttestationTopic(node.dag.forkDigests.altair, subnet_id), + # This proc needs to be within closureScope; don't lift out of loop. + proc(attestation: Attestation): Future[ValidationResult] = + node.processor.attestationValidator(attestation, subnet_id)) + + node.network.addAsyncValidator( + getAggregateAndProofsTopic(node.dag.forkDigests.altair), + proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] = + node.processor.aggregateValidator(signedAggregateAndProof)) + + node.network.addValidator( + getBeaconBlocksTopic(node.dag.forkDigests.altair), + proc (signedBlock: altair.SignedBeaconBlock): ValidationResult = + node.processor[].blockValidator(signedBlock)) + + node.network.addValidator( + getAttesterSlashingsTopic(node.dag.forkDigests.altair), + proc (attesterSlashing: AttesterSlashing): ValidationResult = + node.processor[].attesterSlashingValidator(attesterSlashing)) + + node.network.addValidator( + getProposerSlashingsTopic(node.dag.forkDigests.altair), + proc (proposerSlashing: ProposerSlashing): ValidationResult = + node.processor[].proposerSlashingValidator(proposerSlashing)) + + node.network.addValidator( + getVoluntaryExitsTopic(node.dag.forkDigests.altair), + proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult = + node.processor[].voluntaryExitValidator(signedVoluntaryExit)) + proc stop*(node: BeaconNode) = bnStatus = BeaconNodeStatus.Stopping notice "Graceful shutdown" @@ -1173,10 +1292,7 @@ proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} = node.requestManager.start() node.startSyncManager() - if not startTime.toSlot().afterGenesis: - node.setupDoppelgangerDetection(startTime.slotOrZero()) - node.addMessageHandlers() - doAssert node.getTopicSubscriptionEnabled() + node.updateGossipStatus(startTime.slotOrZero) ## Ctrl+C handling proc controlCHandler() {.noconv.} = diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index 363ac2ce7..c0bce7a45 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -203,6 +203,18 @@ proc slash_validator*( func genesis_time_from_eth1_timestamp*(cfg: RuntimeConfig, eth1_timestamp: uint64): uint64 = eth1_timestamp + cfg.GENESIS_DELAY +func genesisFork*(cfg: RuntimeConfig): Fork = + Fork( + previous_version: cfg.GENESIS_FORK_VERSION, + current_version: cfg.GENESIS_FORK_VERSION, + epoch: GENESIS_EPOCH) + +func altairFork*(cfg: RuntimeConfig): Fork = + Fork( + previous_version: cfg.GENESIS_FORK_VERSION, + current_version: cfg.ALTAIR_FORK_VERSION, + epoch: cfg.ALTAIR_FORK_EPOCH) + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#genesis proc initialize_beacon_state_from_eth1*( cfg: RuntimeConfig, diff --git a/beacon_chain/spec/forkedbeaconstate_helpers.nim b/beacon_chain/spec/forkedbeaconstate_helpers.nim index 33154c5ba..14c43aed9 100644 --- a/beacon_chain/spec/forkedbeaconstate_helpers.nim +++ b/beacon_chain/spec/forkedbeaconstate_helpers.nim @@ -278,3 +278,21 @@ func shortLog*(x: ForkedSignedBeaconBlock | ForkedTrustedSignedBeaconBlock): aut chronicles.formatIt ForkedSignedBeaconBlock: it.shortLog chronicles.formatIt ForkedTrustedSignedBeaconBlock: it.shortLog + +proc forkAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Fork = + if epoch < cfg.ALTAIR_FORK_EPOCH: + genesisFork(cfg) + else: + altairFork(cfg) + +proc forkVersionAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Version = + if epoch < cfg.ALTAIR_FORK_EPOCH: + cfg.GENESIS_FORK_VERSION + else: + cfg.ALTAIR_FORK_VERSION + +proc nextForkEpochAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Epoch = + if epoch < cfg.ALTAIR_FORK_EPOCH: + cfg.ALTAIR_FORK_EPOCH + else: + FAR_FUTURE_EPOCH diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 1a2dfaeec..13ae195a1 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -130,7 +130,7 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} = wallTimeSlot = dag.beaconClock.toBeaconTime(wallTime).slotOrZero StatusMsg( - forkDigest: state.dag.forkDigestAtSlot(wallTimeSlot), + forkDigest: state.dag.forkDigestAtEpoch(wallTimeSlot.epoch), finalizedRoot: getStateField(dag.headState.data, finalized_checkpoint).root, finalizedEpoch: