Merge fork gossip support (#3213)

* Merge fork gossip support

* index directly by BeaconStateFork and remove debugging log statement
This commit is contained in:
tersec 2021-12-21 14:24:23 +00:00 committed by GitHub
parent 0304d28c9e
commit 0d4e49f946
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 203 additions and 113 deletions

View File

@ -187,6 +187,11 @@ OK: 3/3 Fail: 0/3 Skip: 0/3
+ should raise on unknown data OK
```
OK: 7/7 Fail: 0/7 Skip: 0/7
## Gossip fork transition
```diff
+ Gossip fork transition OK
```
OK: 1/1 Fail: 0/1 Skip: 0/1
## Gossip validation [Preset: mainnet]
```diff
+ Any committee index is valid OK

View File

@ -35,11 +35,7 @@ export
type
RpcServer* = RpcHttpServer
GossipState* = enum
Disconnected
ConnectedToPhase0
InTransitionToAltair
ConnectedToAltair
GossipState* = set[BeaconStateFork]
BeaconNode* = ref object
nickname*: string

View File

@ -509,10 +509,14 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
onFinHappened: onFinCb
)
doAssert cfg.GENESIS_FORK_VERSION != cfg.ALTAIR_FORK_VERSION
doAssert cfg.GENESIS_FORK_VERSION != cfg.MERGE_FORK_VERSION
doAssert cfg.ALTAIR_FORK_VERSION != cfg.MERGE_FORK_VERSION
let forkVersions =
[cfg.GENESIS_FORK_VERSION, cfg.ALTAIR_FORK_VERSION, cfg.MERGE_FORK_VERSION,
cfg.SHARDING_FORK_VERSION]
for i in 0 ..< forkVersions.len:
for j in i+1 ..< forkVersions.len:
doAssert forkVersions[i] != forkVersions[j]
doAssert cfg.ALTAIR_FORK_EPOCH <= cfg.MERGE_FORK_EPOCH
doAssert cfg.MERGE_FORK_EPOCH <= cfg.SHARDING_FORK_EPOCH
doAssert dag.updateFlags in [{}, {verifyFinalization}]
var cache: StateCache

View File

@ -459,7 +459,7 @@ proc init(T: type BeaconNode,
processor: processor,
blockProcessor: blockProcessor,
consensusManager: consensusManager,
gossipState: GossipState.Disconnected,
gossipState: {},
beaconClock: beaconClock,
onAttestationSent: onAttestationSent,
validatorMonitor: validatorMonitor
@ -512,9 +512,9 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())
# https://github.com/ethereum/consensus-specs/blob/v1.1.2/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/phase0/validator.md#phase-0-attestation-subnet-stability
proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
if node.gossipState == GossipState.Disconnected:
if node.gossipState.card == 0:
# When disconnected, updateGossipState is responsible for all things
# subnets - in particular, it will remove subscriptions on the edge where
# we enter the disconnected state.
@ -536,20 +536,16 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
# Remember what we subscribed to, so we can unsubscribe later
node.actionTracker.subscribedSubnets = subnets
case node.gossipState
of GossipState.Disconnected:
raiseAssert "Checked above"
of GossipState.ConnectedToPhase0:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
of GossipState.InTransitionToAltair:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.phase0)
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.phase0)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
of GossipState.ConnectedToAltair:
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, node.dag.forkDigests.altair)
node.network.subscribeAttestationSubnets(subscribeSubnets, node.dag.forkDigests.altair)
let forkDigests: array[BeaconStateFork, auto] = [
node.dag.forkDigests.phase0,
node.dag.forkDigests.altair,
node.dag.forkDigests.merge
]
for gossipFork in node.gossipState:
let forkDigest = forkDigests[gossipFork]
node.network.unsubscribeAttestationSubnets(unsubscribeSubnets, forkDigest)
node.network.subscribeAttestationSubnets(subscribeSubnets, forkDigest)
debug "Attestation subnets",
slot, epoch = slot.epoch, gossipState = node.gossipState,
@ -557,7 +553,8 @@ proc updateAttestationSubnetHandlers(node: BeaconNode, slot: Slot) =
aggregateSubnets = subnetLog(aggregateSubnets),
prevSubnets = subnetLog(prevSubnets),
subscribeSubnets = subnetLog(subscribeSubnets),
unsubscribeSubnets = subnetLog(unsubscribeSubnets)
unsubscribeSubnets = subnetLog(unsubscribeSubnets),
gossipState = node.gossipState
# inspired by lighthouse research here
# https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c#file-generate-scoring-params-py
@ -608,18 +605,20 @@ static:
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()
proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true)
proc addPhase0MessageHandlers(
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.network.subscribe(
getBeaconBlocksTopic(forkDigest), blocksTopicParams,
enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.network.subscribe(
getAggregateAndProofsTopic(forkDigest), aggregateTopicParams,
enableTopicMetrics = true)
# updateAttestationSubnetHandlers subscribes attestation subnets
proc addPhase0MessageHandlers(node: BeaconNode, slot: Slot) =
addPhase0MessageHandlers(node, node.dag.forkDigests.phase0, slot)
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest))
@ -633,9 +632,6 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.actionTracker.subscribedSubnets = default(AttnetBits)
proc removePhase0MessageHandlers(node: BeaconNode) =
removePhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.addPhase0MessageHandlers(forkDigest, slot)
@ -653,9 +649,6 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
node.network.updateSyncnetsMetadata(syncnets)
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
addAltairMessageHandlers(node, node.dag.forkDigests.altair, slot)
proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest)
@ -668,20 +661,6 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest))
proc removeAltairMessageHandlers(node: BeaconNode) =
removeAltairMessageHandlers(node, node.dag.forkDigests.altair)
proc addMergeMessageHandlers(node: BeaconNode, slot: Slot) =
addAltairMessageHandlers(node, node.dag.forkDigests.merge, slot)
proc removeMergeMessageHandlers(node: BeaconNode) =
removeAltairMessageHandlers(node, node.dag.forkDigests.merge)
proc removeAllMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers()
node.removeAltairMessageHandlers()
node.removeMergeMessageHandlers()
proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
# When another client's already running, this is very likely to detect
# potential duplicate validators, which can trigger slashing.
@ -726,17 +705,34 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
if slot > head.slot: (slot - head.slot).uint64
else: 0'u64
targetGossipState =
if headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER:
GossipState.Disconnected
elif slot.epoch + 1 < node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToPhase0
elif slot.epoch >= node.dag.cfg.ALTAIR_FORK_EPOCH:
GossipState.ConnectedToAltair
else:
GossipState.InTransitionToAltair
getTargetGossipState(
slot.epoch,
node.dag.cfg.ALTAIR_FORK_EPOCH,
node.dag.cfg.MERGE_FORK_EPOCH,
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER)
if node.gossipState == GossipState.Disconnected and
targetGossipState != GossipState.Disconnected:
doAssert targetGossipState.card <= 2
let
newGossipForks = targetGossipState - node.gossipState
oldGossipForks = node.gossipState - targetGossipState
doAssert newGossipForks.card <= 2
doAssert oldGossipForks.card <= 2
func maxGossipFork(gossipState: GossipState): int =
var res = -1
for gossipFork in gossipState:
res = max(res, gossipFork.int)
res
if maxGossipFork(targetGossipState) < maxGossipFork(node.gossipState) and
targetGossipState != {}:
warn "Unexpected clock regression during transition",
targetGossipState,
gossipState = node.gossipState
if node.gossipState.card == 0 and targetGossipState.card > 0:
# We are synced, so we will connect
debug "Enabling topic subscriptions",
wallSlot = slot,
@ -755,53 +751,36 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
node.actionTracker.updateActions(
node.dag.getEpochRef(head, slot.epoch + 1))
case targetGossipState
of GossipState.Disconnected:
case node.gossipState:
of GossipState.Disconnected: discard
else:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = head.slot,
headDistance
node.removeAllMessageHandlers()
node.gossipState = GossipState.Disconnected
if node.gossipState.card > 0 and targetGossipState.card == 0:
debug "Disabling topic subscriptions",
wallSlot = slot,
headSlot = head.slot,
headDistance
of GossipState.ConnectedToPhase0:
case node.gossipState:
of GossipState.ConnectedToPhase0: discard
of GossipState.Disconnected:
node.addPhase0MessageHandlers(slot)
of GossipState.InTransitionToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.removeAltairMessageHandlers()
node.addPhase0MessageHandlers(slot)
# These depend on forks.BeaconStateFork being properly ordered
let forkDigests: array[BeaconStateFork, auto] = [
node.dag.forkDigests.phase0,
node.dag.forkDigests.altair,
node.dag.forkDigests.merge
]
of GossipState.InTransitionToAltair:
case node.gossipState:
of GossipState.InTransitionToAltair: discard
of GossipState.Disconnected:
node.addPhase0MessageHandlers(slot)
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToAltair:
warn "Unexpected clock regression during altair transition"
node.addPhase0MessageHandlers(slot)
const removeMessageHandlers: array[BeaconStateFork, auto] = [
removePhase0MessageHandlers,
removeAltairMessageHandlers,
removeAltairMessageHandlers # with different forkDigest
]
of GossipState.ConnectedToAltair:
case node.gossipState:
of GossipState.ConnectedToAltair: discard
of GossipState.Disconnected:
node.addAltairMessageHandlers(slot)
of GossipState.ConnectedToPhase0:
node.removePhase0MessageHandlers()
node.addAltairMessageHandlers(slot)
of GossipState.InTransitionToAltair:
node.removePhase0MessageHandlers()
for gossipFork in oldGossipForks:
removeMessageHandlers[gossipFork](node, forkDigests[gossipFork])
const addMessageHandlers: array[BeaconStateFork, auto] = [
addPhase0MessageHandlers,
addAltairMessageHandlers,
addAltairMessageHandlers # with different forkDigest
]
for gossipFork in newGossipForks:
addMessageHandlers[gossipFork](node, forkDigests[gossipFork], slot)
node.gossipState = targetGossipState
node.updateAttestationSubnetHandlers(slot)

View File

@ -7,7 +7,7 @@
import validator_client/[common, fallback_service, duties_service,
attestation_service, fork_service]
proc initGenesis*(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes)
var nodes = vc.beaconNodes
while true:
@ -73,7 +73,7 @@ proc initGenesis*(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
dec(counter)
return melem
proc initValidators*(vc: ValidatorClientRef): Future[bool] {.async.} =
proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} =
info "Initializaing validators", path = vc.config.validatorsDir()
var duplicates: seq[ValidatorPubKey]
for item in vc.config.validatorItems():
@ -86,7 +86,7 @@ proc initValidators*(vc: ValidatorClientRef): Future[bool] {.async.} =
vc.attachedValidators.addLocalValidator(item)
return true
proc initClock*(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
# This procedure performs initialization of BeaconClock using current genesis
# information. It also performs waiting for genesis.
let res = BeaconClock.init(vc.beaconGenesis.genesis_time)
@ -101,7 +101,7 @@ proc initClock*(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
await sleepAsync(genesisTime.offset)
return res
proc asyncInit*(vc: ValidatorClientRef) {.async.} =
proc asyncInit(vc: ValidatorClientRef) {.async.} =
vc.beaconGenesis = await vc.initGenesis()
info "Genesis information", genesis_time = vc.beaconGenesis.genesis_time,
genesis_fork_version = vc.beaconGenesis.genesis_fork_version,
@ -151,7 +151,7 @@ proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
delay = shortLog(delay)
proc asyncRun*(vc: ValidatorClientRef) {.async.} =
proc asyncRun(vc: ValidatorClientRef) {.async.} =
vc.fallbackService.start()
vc.forkService.start()
vc.dutiesService.start()

View File

@ -404,13 +404,13 @@ proc nextForkEpochAtEpoch*(cfg: RuntimeConfig, epoch: Epoch): Epoch =
of BeaconStateFork.Altair: cfg.MERGE_FORK_EPOCH
of BeaconStateFork.Phase0: cfg.ALTAIR_FORK_EPOCH
func getForkSchedule*(cfg: RuntimeConfig): array[2, Fork] =
func getForkSchedule*(cfg: RuntimeConfig): array[3, Fork] =
## This procedure returns list of known and/or scheduled forks.
##
## This procedure is used by HTTP REST framework and validator client.
##
## NOTE: Update this procedure when new fork will be scheduled.
[cfg.genesisFork(), cfg.altairFork()]
[cfg.genesisFork(), cfg.altairFork(), cfg.mergeFork()]
type
# The first few fields of a state, shared across all forks

View File

@ -125,3 +125,44 @@ func getDiscoveryForkID*(cfg: RuntimeConfig,
fork_digest: fork_digest,
next_fork_version: current_fork_version,
next_fork_epoch: FAR_FUTURE_EPOCH)
# https://github.com/ethereum/consensus-specs/blob/v1.1.6/specs/altair/p2p-interface.md#transitioning-the-gossip
func getTargetGossipState*(
epoch, ALTAIR_FORK_EPOCH, MERGE_FORK_EPOCH: Epoch, isBehind: bool):
set[BeaconStateFork] =
if isBehind:
{}
# The order of these checks doesn't matter.
elif epoch >= MERGE_FORK_EPOCH:
{BeaconStateFork.Merge}
elif epoch + 1 < ALTAIR_FORK_EPOCH:
{BeaconStateFork.Phase0}
# Order remaining checks so ALTAIR_FORK_EPOCH == MERGE_FORK_EPOCH works
# and when the transition zones align contiguously, or are separated by
# intermediate pure-Altair epochs.
#
# In the first case, should never enable Altair, and there's also never
# any Phase -> Altair or Altair -> Merge gossip transition epoch. Given
# contiguous Phase0 -> Altair and Altair -> Merge gossip transitions, a
# pure Altair state gossip state never occurs, but it works without any
# special cases so long as one checks for transition-to-fork+1 before a
# pure fork gossip state.
#
# Therefore, check for transition-to-merge before pure-Altair.
elif epoch + 1 >= MERGE_FORK_EPOCH:
# As there are only two fork epochs and there's no transition to phase0
{if ALTAIR_FORK_EPOCH == MERGE_FORK_EPOCH:
BeaconStateFork.Phase0
else:
BeaconStateFork.Altair,
BeaconStateFork.Merge}
elif epoch >= ALTAIR_FORK_EPOCH:
{BeaconStateFork.Altair}
# Must be after the case which catches phase0 => merge
elif epoch + 1 >= ALTAIR_FORK_EPOCH:
{BeaconStateFork.Phase0, BeaconStateFork.Altair}
else:
raiseAssert "Unknown target gossip state"

View File

@ -23,6 +23,7 @@ import # Unit test
./test_eth2_ssz_serialization,
./test_exit_pool,
./test_forks,
./test_gossip_transition,
./test_gossip_validation,
./test_helpers,
./test_honest_validator,

View File

@ -0,0 +1,64 @@
{.used.}
import
unittest2,
./testutil,
../beacon_chain/spec/[forks, network]
template getTargetGossipState(a, b, c: int, isBehind: bool): auto =
getTargetGossipState(a.Epoch, b.Epoch, c.Epoch, isBehind)
suite "Gossip fork transition":
test "Gossip fork transition":
check:
getTargetGossipState(0, 0, 0, false) == {BeaconStateFork.Merge}
getTargetGossipState(0, 0, 2, false) == {BeaconStateFork.Altair}
getTargetGossipState(0, 1, 2, false) == {BeaconStateFork.Phase0, BeaconStateFork.Altair}
getTargetGossipState(0, 2, 3, false) == {BeaconStateFork.Phase0}
getTargetGossipState(0, 2, 5, false) == {BeaconStateFork.Phase0}
getTargetGossipState(0, 3, 4, false) == {BeaconStateFork.Phase0}
getTargetGossipState(0, 3, 5, false) == {BeaconStateFork.Phase0}
getTargetGossipState(0, 4, 4, false) == {BeaconStateFork.Phase0}
getTargetGossipState(0, 4, 5, false) == {BeaconStateFork.Phase0}
getTargetGossipState(1, 0, 1, false) == {BeaconStateFork.Merge}
getTargetGossipState(1, 0, 5, false) == {BeaconStateFork.Altair}
getTargetGossipState(1, 1, 4, false) == {BeaconStateFork.Altair}
getTargetGossipState(2, 0, 2, false) == {BeaconStateFork.Merge}
getTargetGossipState(2, 2, 3, false) == {BeaconStateFork.Altair, BeaconStateFork.Merge}
getTargetGossipState(2, 2, 4, false) == {BeaconStateFork.Altair}
getTargetGossipState(2, 3, 4, false) == {BeaconStateFork.Phase0, BeaconStateFork.Altair}
getTargetGossipState(2, 3, 5, true) == {}
getTargetGossipState(2, 5, 5, false) == {BeaconStateFork.Phase0}
getTargetGossipState(3, 0, 2, false) == {BeaconStateFork.Merge}
getTargetGossipState(3, 0, 3, false) == {BeaconStateFork.Merge}
getTargetGossipState(3, 0, 5, false) == {BeaconStateFork.Altair}
getTargetGossipState(3, 1, 2, false) == {BeaconStateFork.Merge}
getTargetGossipState(3, 1, 1, false) == {BeaconStateFork.Merge}
getTargetGossipState(3, 1, 3, false) == {BeaconStateFork.Merge}
getTargetGossipState(3, 1, 5, true) == {}
getTargetGossipState(3, 1, 4, false) == {BeaconStateFork.Altair, BeaconStateFork.Merge}
getTargetGossipState(3, 2, 3, false) == {BeaconStateFork.Merge}
getTargetGossipState(3, 3, 4, false) == {BeaconStateFork.Altair, BeaconStateFork.Merge}
getTargetGossipState(3, 3, 4, true) == {}
getTargetGossipState(3, 4, 4, false) == {BeaconStateFork.Phase0, BeaconStateFork.Merge}
getTargetGossipState(4, 0, 0, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 0, 1, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 1, 1, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 1, 3, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 2, 4, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 3, 4, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 4, 4, false) == {BeaconStateFork.Merge}
getTargetGossipState(4, 5, 5, false) == {BeaconStateFork.Phase0, BeaconStateFork.Merge}
getTargetGossipState(5, 0, 0, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 0, 2, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 0, 4, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 0, 5, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 0, 5, true) == {}
getTargetGossipState(5, 1, 5, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 2, 2, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 2, 4, true) == {}
getTargetGossipState(5, 3, 4, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 3, 5, false) == {BeaconStateFork.Merge}
getTargetGossipState(5, 5, 5, false) == {BeaconStateFork.Merge}
getTargetGossipState(2, 0, 3, false) == {BeaconStateFork.Altair, BeaconStateFork.Merge}
getTargetGossipState(4, 1, 2, false) == {BeaconStateFork.Merge}