altair topic switching (#2767)

* altair topic switching

* remove validate{Committee,Validator}IndexOr unused within branch
This commit is contained in:
tersec 2021-08-09 12:54:45 +00:00 committed by GitHub
parent b69f566ed1
commit 2afe2802b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 280 additions and 122 deletions

View File

@ -32,6 +32,12 @@ export
type type
RpcServer* = RpcHttpServer RpcServer* = RpcHttpServer
GossipState* = enum
Disconnected
ConnectedToPhase0
InTransitionToAltair
ConnectedToAltair
BeaconNode* = ref object BeaconNode* = ref object
nickname*: string nickname*: string
graffitiBytes*: GraffitiBytes graffitiBytes*: GraffitiBytes
@ -56,6 +62,7 @@ type
blockProcessor*: ref BlockProcessor blockProcessor*: ref BlockProcessor
consensusManager*: ref ConsensusManager consensusManager*: ref ConsensusManager
attachedValidatorBalanceTotal*: uint64 attachedValidatorBalanceTotal*: uint64
gossipState*: GossipState
const const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

View File

@ -454,6 +454,9 @@ proc init*(T: type ChainDAGRef,
dag dag
template genesisValidatorsRoot*(dag: ChainDAGRef): Eth2Digest =
getStateField(dag.headState.data, genesis_validators_root)
func getEpochRef*( func getEpochRef*(
dag: ChainDAGRef, state: StateData, cache: var StateCache): EpochRef = dag: ChainDAGRef, state: StateData, cache: var StateCache): EpochRef =
let let
@ -544,8 +547,11 @@ func stateCheckpoint*(bs: BlockSlot): BlockSlot =
bs = bs.parentOrSlot bs = bs.parentOrSlot
bs bs
proc forkDigestAtSlot*(dag: ChainDAGRef, slot: Slot): ForkDigest = template forkAtEpoch*(dag: ChainDAGRef, epoch: Epoch): Fork =
if slot.epoch < dag.cfg.ALTAIR_FORK_EPOCH: forkAtEpoch(dag.cfg, epoch)
proc forkDigestAtEpoch*(dag: ChainDAGRef, epoch: Epoch): ForkDigest =
if epoch < dag.cfg.ALTAIR_FORK_EPOCH:
dag.forkDigests.phase0 dag.forkDigests.phase0
else: else:
dag.forkDigests.altair dag.forkDigests.altair

View File

@ -1825,27 +1825,26 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
except IOError as exc: except IOError as exc:
raiseAssert exc.msg # TODO in-memory compression shouldn't fail raiseAssert exc.msg # TODO in-memory compression shouldn't fail
proc subscribeAttestationSubnets*( proc subscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT],
node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {. forkDigest: ForkDigest)
raises: [Defect, CatchableError].} = {.raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet_id, enabled in subnets: for subnet_id, enabled in subnets:
if enabled: if enabled:
node.subscribe(getAttestationTopic( node.subscribe(getAttestationTopic(
node.forkID.fork_digest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now
proc unsubscribeAttestationSubnets*( proc unsubscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT],
node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {. forkDigest: ForkDigest)
raises: [Defect, CatchableError].} = {.raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet_id, enabled in subnets: for subnet_id, enabled in subnets:
if enabled: if enabled:
node.unsubscribe(getAttestationTopic( node.unsubscribe(getAttestationTopic(forkDigest, SubnetId(subnet_id)))
node.forkID.fork_digest, SubnetId(subnet_id)))
proc updateStabilitySubnetMetadata*( proc updateStabilitySubnetMetadata*(
node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) = node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) =

View File

@ -35,10 +35,11 @@ import
slashing_protection, keystore_management], slashing_protection, keystore_management],
./sync/[sync_manager, sync_protocol, request_manager], ./sync/[sync_manager, sync_protocol, request_manager],
./rpc/[rest_api, rpc_api], ./rpc/[rest_api, rpc_api],
./spec/datatypes/[altair, phase0],
./spec/[ ./spec/[
datatypes/phase0, datatypes/altair, digest, crypto, digest, crypto, forkedbeaconstate_helpers, beaconstate,
forkedbeaconstate_helpers, beaconstate, eth2_apis/rpc_beacon_client, eth2_apis/rpc_beacon_client, helpers, network, presets,
helpers, network, presets, weak_subjectivity, signatures], weak_subjectivity, signatures],
./consensus_object_pools/[ ./consensus_object_pools/[
blockchain_dag, block_quarantine, block_clearance, block_pools_types, blockchain_dag, block_quarantine, block_clearance, block_pools_types,
attestation_pool, exit_pool, spec_cache], attestation_pool, exit_pool, spec_cache],
@ -352,6 +353,7 @@ proc init*(T: type BeaconNode,
db: db, db: db,
config: config, config: config,
dag: dag, dag: dag,
gossipState: GossipState.Disconnected,
quarantine: quarantine, quarantine: quarantine,
attestationPool: attestationPool, attestationPool: attestationPool,
attachedValidators: validatorPool, attachedValidators: validatorPool,
@ -368,17 +370,23 @@ proc init*(T: type BeaconNode,
# set topic validation routine # set topic validation routine
network.setValidTopics( network.setValidTopics(
block: block:
# TODO altair-transition
var var
topics = @[ topics = @[
getBeaconBlocksTopic(network.forkDigests.phase0), getBeaconBlocksTopic(network.forkDigests.phase0),
getAttesterSlashingsTopic(network.forkDigests.phase0), getAttesterSlashingsTopic(network.forkDigests.phase0),
getProposerSlashingsTopic(network.forkDigests.phase0), getProposerSlashingsTopic(network.forkDigests.phase0),
getVoluntaryExitsTopic(network.forkDigests.phase0), getVoluntaryExitsTopic(network.forkDigests.phase0),
getAggregateAndProofsTopic(network.forkDigests.phase0) getAggregateAndProofsTopic(network.forkDigests.phase0),
getBeaconBlocksTopic(dag.forkDigests.altair),
getAttesterSlashingsTopic(network.forkDigests.altair),
getProposerSlashingsTopic(network.forkDigests.altair),
getVoluntaryExitsTopic(network.forkDigests.altair),
getAggregateAndProofsTopic(network.forkDigests.altair)
] ]
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
topics &= getAttestationTopic(network.forkDigests.phase0, SubnetId(subnet_id)) topics &= getAttestationTopic(network.forkDigests.phase0, SubnetId(subnet_id))
topics &= getAttestationTopic(network.forkDigests.altair, SubnetId(subnet_id))
topics) topics)
if node.config.inProcessValidators: if node.config.inProcessValidators:
@ -392,9 +400,6 @@ proc init*(T: type BeaconNode,
except Exception as exc: raiseAssert exc.msg except Exception as exc: raiseAssert exc.msg
node.addRemoteValidators() node.addRemoteValidators()
# This merely configures the BeaconSync
# The traffic will be started when we join the network.
# TODO altair-transition
network.initBeaconSync(dag, getTime) network.initBeaconSync(dag, getTime)
node.updateValidatorMetrics() node.updateValidatorMetrics()
@ -604,8 +609,20 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
unsubscribeSubnets = prevAllSubnets - allSubnets unsubscribeSubnets = prevAllSubnets - allSubnets
subscribeSubnets = allSubnets - prevAllSubnets subscribeSubnets = allSubnets - prevAllSubnets
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets) case node.gossipState
node.network.subscribeAttestationSubnets(subscribeSubnets) of GossipState.Disconnected:
discard
of GossipState.ConnectedToPhase0:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
of GossipState.InTransitionToAltair:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
of GossipState.ConnectedToAltair:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
debug "Attestation subnets", debug "Attestation subnets",
wallSlot, wallSlot,
@ -642,7 +659,8 @@ proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] =
mergeAggregateSubnets(wallEpoch) mergeAggregateSubnets(wallEpoch)
mergeAggregateSubnets(wallEpoch + 1) mergeAggregateSubnets(wallEpoch + 1)
proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. proc subscribeAttestationSubnetHandlers(node: BeaconNode,
forkDigest: ForkDigest) {.
raises: [Defect, CatchableError].} = raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# TODO: # TODO:
@ -690,84 +708,102 @@ proc subscribeAttestationSubnetHandlers(node: BeaconNode) {.
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets), aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
stabilitySubnets = subnetLog(stabilitySubnets) stabilitySubnets = subnetLog(stabilitySubnets)
node.network.subscribeAttestationSubnets( node.network.subscribeAttestationSubnets(
node.attestationSubnets.aggregateSubnets + stabilitySubnets) node.attestationSubnets.aggregateSubnets + stabilitySubnets,
forkDigest)
proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = # inspired by lighthouse research here
# inspired by lighthouse research here # https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py const
const blocksTopicParams = TopicParams(
blocksTopicParams = TopicParams( topicWeight: 0.5,
topicWeight: 0.5, timeInMeshWeight: 0.03333333333333333,
timeInMeshWeight: 0.03333333333333333, timeInMeshQuantum: chronos.seconds(12),
timeInMeshQuantum: chronos.seconds(12), timeInMeshCap: 300,
timeInMeshCap: 300, firstMessageDeliveriesWeight: 1.1471603557060206,
firstMessageDeliveriesWeight: 1.1471603557060206, firstMessageDeliveriesDecay: 0.9928302477768374,
firstMessageDeliveriesDecay: 0.9928302477768374, firstMessageDeliveriesCap: 34.86870846001471,
firstMessageDeliveriesCap: 34.86870846001471, meshMessageDeliveriesWeight: -458.31054878249114,
meshMessageDeliveriesWeight: -458.31054878249114, meshMessageDeliveriesDecay: 0.9716279515771061,
meshMessageDeliveriesDecay: 0.9716279515771061, meshMessageDeliveriesThreshold: 0.6849191409056553,
meshMessageDeliveriesThreshold: 0.6849191409056553, meshMessageDeliveriesCap: 2.054757422716966,
meshMessageDeliveriesCap: 2.054757422716966, meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesActivation: chronos.seconds(384), meshMessageDeliveriesWindow: chronos.seconds(2),
meshMessageDeliveriesWindow: chronos.seconds(2), meshFailurePenaltyWeight: -458.31054878249114 ,
meshFailurePenaltyWeight: -458.31054878249114 , meshFailurePenaltyDecay: 0.9716279515771061,
meshFailurePenaltyDecay: 0.9716279515771061, invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesWeight: -214.99999999999994, invalidMessageDeliveriesDecay: 0.9971259067705325
invalidMessageDeliveriesDecay: 0.9971259067705325 )
) aggregateTopicParams = TopicParams(
aggregateTopicParams = TopicParams( topicWeight: 0.5,
topicWeight: 0.5, timeInMeshWeight: 0.03333333333333333,
timeInMeshWeight: 0.03333333333333333, timeInMeshQuantum: chronos.seconds(12),
timeInMeshQuantum: chronos.seconds(12), timeInMeshCap: 300,
timeInMeshCap: 300, firstMessageDeliveriesWeight: 0.10764904539552399,
firstMessageDeliveriesWeight: 0.10764904539552399, firstMessageDeliveriesDecay: 0.8659643233600653,
firstMessageDeliveriesDecay: 0.8659643233600653, firstMessageDeliveriesCap: 371.5778421725158,
firstMessageDeliveriesCap: 371.5778421725158, meshMessageDeliveriesWeight: -0.07538533073670682,
meshMessageDeliveriesWeight: -0.07538533073670682, meshMessageDeliveriesDecay: 0.930572040929699,
meshMessageDeliveriesDecay: 0.930572040929699, meshMessageDeliveriesThreshold: 53.404248450179836,
meshMessageDeliveriesThreshold: 53.404248450179836, meshMessageDeliveriesCap: 213.61699380071934,
meshMessageDeliveriesCap: 213.61699380071934, meshMessageDeliveriesActivation: chronos.seconds(384),
meshMessageDeliveriesActivation: chronos.seconds(384), meshMessageDeliveriesWindow: chronos.seconds(2),
meshMessageDeliveriesWindow: chronos.seconds(2), meshFailurePenaltyWeight: -0.07538533073670682 ,
meshFailurePenaltyWeight: -0.07538533073670682 , meshFailurePenaltyDecay: 0.930572040929699,
meshFailurePenaltyDecay: 0.930572040929699, invalidMessageDeliveriesWeight: -214.99999999999994,
invalidMessageDeliveriesWeight: -214.99999999999994, invalidMessageDeliveriesDecay: 0.9971259067705325
invalidMessageDeliveriesDecay: 0.9971259067705325 )
) basicParams = TopicParams.init()
basicParams = TopicParams.init()
static: static:
# compile time validation # compile time validation
blocksTopicParams.validateParameters().tryGet() blocksTopicParams.validateParameters().tryGet()
aggregateTopicParams.validateParameters().tryGet() aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet() basicParams.validateParameters.tryGet()
# TODO altair-transition proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest)
node.network.subscribe( {.raises: [Defect, CatchableError].} =
getBeaconBlocksTopic(node.dag.forkDigests.phase0), blocksTopicParams, enableTopicMetrics = true) node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0), basicParams) node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0), basicParams) node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0), basicParams) node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0), aggregateTopicParams, enableTopicMetrics = true) node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.subscribeAttestationSubnetHandlers()
node.subscribeAttestationSubnetHandlers(forkDigest)
proc addPhase0MessageHandlers(node: BeaconNode)
{.raises: [Defect, CatchableError].} =
addPhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest))
node.network.unsubscribe(getProposerSlashingsTopic(forkDigest))
node.network.unsubscribe(getAttesterSlashingsTopic(forkDigest))
node.network.unsubscribe(getAggregateAndProofsTopic(forkDigest))
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
node.network.unsubscribe(
getAttestationTopic(forkDigest, SubnetId(subnet_id)))
proc removePhase0MessageHandlers(node: BeaconNode)
{.raises: [Defect, CatchableError].} =
removePhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot)
{.raises: [Defect, CatchableError].} =
node.addPhase0MessageHandlers(node.dag.forkDigests.altair)
proc removeAltairMessageHandlers(node: BeaconNode)
{.raises: [Defect, CatchableError].} =
node.removePhase0MessageHandlers(node.dag.forkDigests.altair)
func getTopicSubscriptionEnabled(node: BeaconNode): bool = func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled node.attestationSubnets.enabled
proc removeMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = proc removeAllMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.attestationSubnets.enabled = false node.removePhase0MessageHandlers()
doAssert not node.getTopicSubscriptionEnabled() node.removeAltairMessageHandlers()
# TODO altair-transition
node.network.unsubscribe(getBeaconBlocksTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0))
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
node.network.unsubscribe(
getAttestationTopic(node.dag.forkDigests.phase0, SubnetId(subnet_id)))
proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) = proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
# When another client's already running, this is very likely to detect # When another client's already running, this is very likely to detect
@ -796,12 +832,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, Catchab
let let
syncQueueLen = node.syncManager.syncQueueLen syncQueueLen = node.syncManager.syncQueueLen
topicSubscriptionEnabled = node.getTopicSubscriptionEnabled() targetGossipState =
if
# Don't enable if already enabled; to avoid race conditions requires care,
# but isn't crucial, as this condition spuriously fail, but the next time,
# should properly succeed.
not topicSubscriptionEnabled and
# SyncManager forward sync by default runs until maxHeadAge slots, or one # SyncManager forward sync by default runs until maxHeadAge slots, or one
# epoch range is achieved. This particular condition has a couple caveats # epoch range is achieved. This particular condition has a couple caveats
# including that under certain conditions, debtsCount appears to push len # including that under certain conditions, debtsCount appears to push len
@ -811,28 +842,80 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, Catchab
# are left. Furthermore, even when 0 peers are being used, this won't get # are left. Furthermore, even when 0 peers are being used, this won't get
# to 0 slots in syncQueueLen, but that's a vacuous condition given that a # to 0 slots in syncQueueLen, but that's a vacuous condition given that a
# networking interaction cannot happen under such circumstances. # networking interaction cannot happen under such circumstances.
syncQueueLen < TOPIC_SUBSCRIBE_THRESHOLD_SLOTS: if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS:
# When node.cycleAttestationSubnets() is enabled more properly, integrate GossipState.Disconnected
# this into the node.cycleAttestationSubnets() call. elif slot.epoch + 1 < node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToPhase0
elif slot.epoch >= node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToAltair
else:
GossipState.InTransitionToAltair
if node.gossipState == GossipState.Disconnected and
targetGossipState != GossipState.Disconnected:
# We are synced, so we will connect
debug "Enabling topic subscriptions", debug "Enabling topic subscriptions",
wallSlot = slot, wallSlot = slot,
headSlot = node.dag.head.slot, headSlot = node.dag.head.slot,
syncQueueLen syncQueueLen
node.setupDoppelgangerDetection(slot) node.setupDoppelgangerDetection(slot)
node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled() block addRemoveHandlers:
elif case targetGossipState
topicSubscriptionEnabled and of GossipState.Disconnected:
syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and case node.gossipState:
# Filter out underflow from debtsCount; plausible queue lengths can't of GossipState.Disconnected: break
# exceed wallslot, with safety margin. else:
syncQueueLen < 2 * slot.uint64: if syncQueueLen > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
debug "Disabling topic subscriptions", # Filter out underflow from debtsCount; plausible queue lengths can't
wallSlot = slot, # exceed wallslot, with safety margin.
headSlot = node.dag.head.slot, syncQueueLen < 2 * slot.uint64:
syncQueueLen debug "Disabling topic subscriptions",
node.removeMessageHandlers() wallSlot = slot,
headSlot = node.dag.head.slot,
syncQueueLen
node.removeAllMessageHandlers()
node.gossipState = GossipState.Disconnected
break
of GossipState.ConnectedToPhase0:
case node.gossipState:
of GossipState.ConnectedToPhase0: break
of GossipState.Disconnected:
node.addPhase0MessageHandlers()
of GossipState.InTransitionToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
node.addPhase0MessageHandlers()
of GossipState.InTransitionToAltair:
case node.gossipState:
of GossipState.InTransitionToAltair: break
of GossipState.Disconnected:
node.addPhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.addPhase0MessageHandlers()
of GossipState.ConnectedToAltair:
case node.gossipState:
of GossipState.ConnectedToAltair: break
of GossipState.Disconnected:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.removePhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.InTransitionToAltair:
node.removePhase0MessageHandlers()
node.gossipState = targetGossipState
# Subscription or unsubscription might have occurred; recheck. Since Nimbus # Subscription or unsubscription might have occurred; recheck. Since Nimbus
# initially subscribes to all subnets, simply do not ever cycle attestation # initially subscribes to all subnets, simply do not ever cycle attestation
@ -1047,7 +1130,8 @@ proc startSyncManager(node: BeaconNode) =
true true
proc onDeletePeer(peer: Peer) = proc onDeletePeer(peer: Peer) =
if peer.connectionState notin {Disconnecting, ConnectionState.Disconnected}: if peer.connectionState notin {ConnectionState.Disconnecting,
ConnectionState.Disconnected}:
if peer.score < PeerScoreLowLimit: if peer.score < PeerScoreLowLimit:
debug "Peer was removed from PeerPool due to low score", peer = peer, debug "Peer was removed from PeerPool due to low score", peer = peer,
peer_score = peer.score, score_low_limit = PeerScoreLowLimit, peer_score = peer.score, score_low_limit = PeerScoreLowLimit,
@ -1098,7 +1182,7 @@ proc installMessageValidators(node: BeaconNode) =
# These validators stay around the whole time, regardless of which specific # These validators stay around the whole time, regardless of which specific
# subnets are subscribed to during any given epoch. # subnets are subscribed to during any given epoch.
# TODO altair-transition # TODO altair-transition, well, without massive copy/pasting (extract to template or etc)
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope: closureScope:
let subnet_id = SubnetId(it) let subnet_id = SubnetId(it)
@ -1133,6 +1217,41 @@ proc installMessageValidators(node: BeaconNode) =
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult = proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
node.processor[].voluntaryExitValidator(signedVoluntaryExit)) node.processor[].voluntaryExitValidator(signedVoluntaryExit))
# TODO copy/paste starts here; templatize whole thing
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope:
let subnet_id = SubnetId(it)
node.network.addAsyncValidator(
getAttestationTopic(node.dag.forkDigests.altair, subnet_id),
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] =
node.processor.attestationValidator(attestation, subnet_id))
node.network.addAsyncValidator(
getAggregateAndProofsTopic(node.dag.forkDigests.altair),
proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] =
node.processor.aggregateValidator(signedAggregateAndProof))
node.network.addValidator(
getBeaconBlocksTopic(node.dag.forkDigests.altair),
proc (signedBlock: altair.SignedBeaconBlock): ValidationResult =
node.processor[].blockValidator(signedBlock))
node.network.addValidator(
getAttesterSlashingsTopic(node.dag.forkDigests.altair),
proc (attesterSlashing: AttesterSlashing): ValidationResult =
node.processor[].attesterSlashingValidator(attesterSlashing))
node.network.addValidator(
getProposerSlashingsTopic(node.dag.forkDigests.altair),
proc (proposerSlashing: ProposerSlashing): ValidationResult =
node.processor[].proposerSlashingValidator(proposerSlashing))
node.network.addValidator(
getVoluntaryExitsTopic(node.dag.forkDigests.altair),
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
node.processor[].voluntaryExitValidator(signedVoluntaryExit))
proc stop*(node: BeaconNode) = proc stop*(node: BeaconNode) =
bnStatus = BeaconNodeStatus.Stopping bnStatus = BeaconNodeStatus.Stopping
notice "Graceful shutdown" notice "Graceful shutdown"
@ -1173,10 +1292,7 @@ proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.requestManager.start() node.requestManager.start()
node.startSyncManager() node.startSyncManager()
if not startTime.toSlot().afterGenesis: node.updateGossipStatus(startTime.slotOrZero)
node.setupDoppelgangerDetection(startTime.slotOrZero())
node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled()
## Ctrl+C handling ## Ctrl+C handling
proc controlCHandler() {.noconv.} = proc controlCHandler() {.noconv.} =

View File

@ -203,6 +203,18 @@ proc slash_validator*(
func genesis_time_from_eth1_timestamp*(cfg: RuntimeConfig, eth1_timestamp: uint64): uint64 = func genesis_time_from_eth1_timestamp*(cfg: RuntimeConfig, eth1_timestamp: uint64): uint64 =
eth1_timestamp + cfg.GENESIS_DELAY eth1_timestamp + cfg.GENESIS_DELAY
func genesisFork*(cfg: RuntimeConfig): Fork =
Fork(
previous_version: cfg.GENESIS_FORK_VERSION,
current_version: cfg.GENESIS_FORK_VERSION,
epoch: GENESIS_EPOCH)
func altairFork*(cfg: RuntimeConfig): Fork =
Fork(
previous_version: cfg.GENESIS_FORK_VERSION,
current_version: cfg.ALTAIR_FORK_VERSION,
epoch: cfg.ALTAIR_FORK_EPOCH)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#genesis # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#genesis
proc initialize_beacon_state_from_eth1*( proc initialize_beacon_state_from_eth1*(
cfg: RuntimeConfig, cfg: RuntimeConfig,

View File

@ -278,3 +278,21 @@ func shortLog*(x: ForkedSignedBeaconBlock | ForkedTrustedSignedBeaconBlock): aut
chronicles.formatIt ForkedSignedBeaconBlock: it.shortLog chronicles.formatIt ForkedSignedBeaconBlock: it.shortLog
chronicles.formatIt ForkedTrustedSignedBeaconBlock: it.shortLog chronicles.formatIt ForkedTrustedSignedBeaconBlock: it.shortLog
proc forkAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Fork =
if epoch < cfg.ALTAIR_FORK_EPOCH:
genesisFork(cfg)
else:
altairFork(cfg)
proc forkVersionAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Version =
if epoch < cfg.ALTAIR_FORK_EPOCH:
cfg.GENESIS_FORK_VERSION
else:
cfg.ALTAIR_FORK_VERSION
proc nextForkEpochAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Epoch =
if epoch < cfg.ALTAIR_FORK_EPOCH:
cfg.ALTAIR_FORK_EPOCH
else:
FAR_FUTURE_EPOCH

View File

@ -130,7 +130,7 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} =
wallTimeSlot = dag.beaconClock.toBeaconTime(wallTime).slotOrZero wallTimeSlot = dag.beaconClock.toBeaconTime(wallTime).slotOrZero
StatusMsg( StatusMsg(
forkDigest: state.dag.forkDigestAtSlot(wallTimeSlot), forkDigest: state.dag.forkDigestAtEpoch(wallTimeSlot.epoch),
finalizedRoot: finalizedRoot:
getStateField(dag.headState.data, finalized_checkpoint).root, getStateField(dag.headState.data, finalized_checkpoint).root,
finalizedEpoch: finalizedEpoch: