send attestations and exit messages on fork-appropriate topic (#2773)

* send attestations and exit messages on fork-appropriate topic

* document why use wall clock over attestation slot

* centralize some fork-topic-picking-logic in eth2_network

* pick up new test in summary

* allow specified GetTimeFn for testing purposes

* add GenesisTime and use it in eth2_network

* replace GetTimeFn and GenesisTime with GetBeaconTimeFn
This commit is contained in:
tersec 2021-08-19 10:45:31 +00:00 committed by GitHub
parent a7a65bce42
commit 317b6de4e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 91 additions and 71 deletions

View File

@ -33,7 +33,7 @@ type
BeaconTime* = distinct Duration ## Nanoseconds from beacon genesis time
GetTimeFn* = proc(): Time {.gcsafe, raises: [Defect].}
GetBeaconTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
proc init*(T: type BeaconClock, genesis_time: uint64): T =
# ~290 billion years into the future

View File

@ -63,7 +63,7 @@ type
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
## Blockchain DAG, AttestationPool and Quarantine
getTime: GetTimeFn
getBeaconTime: GetBeaconTimeFn
# Initialization
# ------------------------------------------------------------------------------
@ -72,17 +72,14 @@ proc new*(T: type BlockProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
consensusManager: ref ConsensusManager,
getTime: GetTimeFn): ref BlockProcessor =
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
blocksQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
getTime: getTime)
proc getCurrentBeaconTime*(self: BlockProcessor): BeaconTime =
self.consensusManager.dag.beaconClock.toBeaconTime(self.getTime())
getBeaconTime: getBeaconTime)
# Sync callbacks
# ------------------------------------------------------------------------------
@ -178,7 +175,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
blockRoot = shortLog(entry.blck.root)
let
wallTime = self.getCurrentBeaconTime()
wallTime = self.getBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:

View File

@ -86,7 +86,7 @@ type
quarantine*: QuarantineRef
# Application-provided current time provider (to facilitate testing)
getTime*: GetTimeFn
getCurrentBeaconTime*: GetBeaconTimeFn
# Initialization
# ------------------------------------------------------------------------------
@ -100,7 +100,7 @@ proc new*(T: type Eth2Processor,
validatorPool: ref ValidatorPool,
quarantine: QuarantineRef,
rng: ref BrHmacDrbgContext,
getTime: GetTimeFn): ref Eth2Processor =
getBeaconTime: GetBeaconTimeFn): ref Eth2Processor =
(ref Eth2Processor)(
doppelGangerDetectionEnabled: doppelGangerDetectionEnabled,
doppelgangerDetection: DoppelgangerProtection(
@ -111,7 +111,7 @@ proc new*(T: type Eth2Processor,
exitPool: exitPool,
validatorPool: validatorPool,
quarantine: quarantine,
getTime: getTime,
getCurrentBeaconTime: getBeaconTime,
batchCrypto: BatchCrypto.new(
rng = rng,
# Only run eager attestation signature verification if we're not
@ -119,9 +119,6 @@ proc new*(T: type Eth2Processor,
eager = proc(): bool = not blockProcessor[].hasBlocks())
)
proc getCurrentBeaconTime*(self: Eth2Processor|ref Eth2Processor): BeaconTime =
self.dag.beaconClock.toBeaconTime(self.getTime())
# Gossip Management
# -----------------------------------------------------------------------------------

View File

@ -85,6 +85,8 @@ type
rng*: ref BrHmacDrbgContext
peers*: Table[PeerID, Peer]
validTopics: HashSet[string]
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
@ -1126,10 +1128,11 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
peer = peerId, peer_state = peer.connectionState
peer.connectionState = Disconnected
proc new*(T: type Eth2Node, config: BeaconNodeConf,
proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
enrForkId: ENRForkID, forkDigests: ForkDigestsRef,
switch: Switch, pubsub: GossipSub, ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool,
getBeaconTime: GetBeaconTimeFn, switch: Switch,
pubsub: GossipSub, ip: Option[ValidIpAddress], tcpPort,
udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool,
rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} =
let
metadata = getPersistentNetMetadata(config)
@ -1146,6 +1149,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf,
switch: switch,
pubsub: pubsub,
wantedPeers: config.maxPeers,
cfg: runtimeCfg,
peerPool: newPeerPool[Peer, PeerID](maxPeers = config.maxPeers),
# Its important here to create AsyncQueue with limited size, otherwise
# it could produce HIGH cpu usage.
@ -1155,6 +1159,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf,
metadata: metadata,
forkId: enrForkId,
forkDigests: forkDigests,
getBeaconTime: getBeaconTime,
discovery: Eth2DiscoveryProtocol.new(
config, ip, tcpPort, udpPort, privKey,
{
@ -1586,11 +1591,12 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
netKeys: NetKeyPair,
cfg: RuntimeConfig,
forkDigests: ForkDigestsRef,
wallEpoch: Epoch,
getBeaconTime: GetBeaconTimeFn,
genesisValidatorsRoot: Eth2Digest): Eth2Node
{.raises: [Defect, CatchableError].} =
let
enrForkId = getENRForkID(cfg, wallEpoch, genesisValidatorsRoot)
enrForkId = getENRForkID(
cfg, getBeaconTime().slotOrZero.epoch, genesisValidatorsRoot)
(extIp, extTcpPort, extUdpPort) = try: setupAddress(
config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId)
@ -1673,13 +1679,10 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
switch.mount(pubsub)
Eth2Node.new(config, enrForkId,
forkDigests,
switch, pubsub,
extIp, extTcpPort, extUdpPort,
netKeys.seckey.asEthKey,
discovery = config.discv5Enabled,
rng = rng)
Eth2Node.new(
config, cfg, enrForkId, forkDigests, getBeaconTime, switch, pubsub, extIp,
extTcpPort, extUdpPort, netKeys.seckey.asEthKey,
discovery = config.discv5Enabled, rng = rng)
proc announcedENR*(node: Eth2Node): enr.Record =
doAssert node.discovery != nil, "The Eth2Node must be initialized"
@ -1885,3 +1888,40 @@ func getStabilitySubnetLength*(node: Eth2Node): uint64 =
func getRandomSubnetId*(node: Eth2Node): SubnetId =
node.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).SubnetId
func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest =
if epoch < node.cfg.ALTAIR_FORK_EPOCH:
node.forkDigests.phase0
else:
node.forkDigests.altair
proc getWallEpoch(node: Eth2Node): Epoch =
node.getBeaconTime().slotOrZero.epoch
proc sendAttestation*(
node: Eth2Node, subnet_id: SubnetId, attestation: Attestation) =
# Regardless of the contents of the attestation,
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-beta.2/specs/altair/p2p-interface.md#transitioning-the-gossip
# implies that pre-fork, messages using post-fork digests might be
# ignored, whilst post-fork, there is effectively a seen_ttl-based
# timer unsubscription point that means no new pre-fork-forkdigest
# should be sent.
let forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch)
node.broadcast(
getAttestationTopic(forkPrefix, subnet_id),
attestation)
proc sendVoluntaryExit*(node: Eth2Node, exit: SignedVoluntaryExit) =
let exitsTopic = getVoluntaryExitsTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(exitsTopic, exit)
proc sendAttesterSlashing*(node: Eth2Node, slashing: AttesterSlashing) =
let attesterSlashingsTopic = getAttesterSlashingsTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(attesterSlashingsTopic, slashing)
proc sendProposerSlashing*(node: Eth2Node, slashing: ProposerSlashing) =
let proposerSlashingsTopic = getProposerSlashingsTopic(
node.forkDigestAtEpoch(node.getWallEpoch))
node.broadcast(proposerSlashingsTopic, slashing)

View File

@ -95,6 +95,9 @@ logScope: topics = "beacnde"
const SlashingDbName = "slashing_protection"
# changing this requires physical file rename as well or history is lost.
func getBeaconTimeFn(clock: BeaconClock): GetBeaconTimeFn =
return proc(): BeaconTime = clock.now()
proc init*(T: type BeaconNode,
cfg: RuntimeConfig,
rng: ref BrHmacDrbgContext,
@ -301,9 +304,9 @@ proc init*(T: type BeaconNode,
netKeys = getPersistentNetKeys(rng[], config)
nickname = if config.nodeName == "auto": shortForm(netKeys)
else: config.nodeName
getBeaconTime = dag.beaconClock.getBeaconTimeFn()
network = createEth2Node(
rng, config, netKeys, cfg, dag.forkDigests,
dag.beaconClock.now.slotOrZero.epoch,
rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime,
getStateField(dag.headState.data, genesis_validators_root))
attestationPool = newClone(AttestationPool.init(dag, quarantine))
exitPool = newClone(ExitPool.init(dag, quarantine))
@ -332,11 +335,11 @@ proc init*(T: type BeaconNode,
)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
consensusManager, getTime)
consensusManager, getBeaconTime)
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, dag, attestationPool, exitPool, validatorPool,
quarantine, rng, getTime)
quarantine, rng, getBeaconTime)
var node = BeaconNode(
nickname: nickname,
@ -394,7 +397,7 @@ proc init*(T: type BeaconNode,
except Exception as exc: raiseAssert exc.msg
node.addRemoteValidators()
network.initBeaconSync(dag, getTime)
network.initBeaconSync(dag, getBeaconTime)
node.updateValidatorMetrics()

View File

@ -833,7 +833,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
AttesterSlashingValidationError,
$vres.error())
res
node.sendAttesterSlashing(slashing)
node.network.sendAttesterSlashing(slashing)
return RestApiResponse.jsonMsgResponse(AttesterSlashingValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getPoolProposerSlashings
@ -867,7 +867,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
ProposerSlashingValidationError,
$vres.error())
res
node.sendProposerSlashing(slashing)
node.network.sendProposerSlashing(slashing)
return RestApiResponse.jsonMsgResponse(ProposerSlashingValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getPoolVoluntaryExits
@ -901,7 +901,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
VoluntaryExitValidationError,
$vres.error())
res
node.sendVoluntaryExit(exit)
node.network.sendVoluntaryExit(exit)
return RestApiResponse.jsonMsgResponse(VoluntaryExitValidationSuccess)
router.redirect(

View File

@ -489,7 +489,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
raise newException(CatchableError, "Exit pool is not yet available!")
let validity = node.exitPool[].validateAttesterSlashing(slashing)
if validity.isOk:
node.sendAttesterSlashing(slashing)
node.network.sendAttesterSlashing(slashing)
else:
raise newException(CatchableError, $(validity.error[1]))
return true
@ -511,7 +511,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
raise newException(CatchableError, "Exit pool is not yet available!")
let validity = node.exitPool[].validateProposerSlashing(slashing)
if validity.isOk:
node.sendProposerSlashing(slashing)
node.network.sendProposerSlashing(slashing)
else:
raise newException(CatchableError, $(validity.error[1]))
return true
@ -533,7 +533,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
raise newException(CatchableError, "Exit pool is not yet available!")
let validity = node.exitPool[].validateVoluntaryExit(exit)
if validity.isOk:
node.sendVoluntaryExit(exit)
node.network.sendVoluntaryExit(exit)
else:
raise newException(CatchableError, $(validity.error[1]))
return true

View File

@ -47,7 +47,7 @@ type
BeaconSyncNetworkState* = ref object
dag*: ChainDAGRef
getTime*: GetTimeFn
getBeaconTime*: GetBeaconTimeFn
BeaconSyncPeerState* = ref object
statusLastTime*: chronos.Moment
@ -121,8 +121,7 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} =
let
dag = state.dag
headBlock = dag.head
wallTime = state.getTime()
wallTimeSlot = dag.beaconClock.toBeaconTime(wallTime).slotOrZero
wallTimeSlot = state.getBeaconTime().slotOrZero
StatusMsg(
forkDigest: state.dag.forkDigestAtEpoch(wallTimeSlot.epoch),
@ -358,8 +357,7 @@ p2pProtocol BeaconSync(version = 1,
proc useSyncV2*(state: BeaconSyncNetworkState): bool =
let
wallTime = state.getTime()
wallTimeSlot = state.dag.beaconClock.toBeaconTime(wallTime).slotOrZero
wallTimeSlot = state.getBeaconTime().slotOrZero
wallTimeSlot.epoch >= state.dag.cfg.ALTAIR_FORK_EPOCH
@ -407,7 +405,7 @@ proc handleStatus(peer: Peer,
await peer.handlePeer()
proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef,
getTime: GetTimeFn) =
getBeaconTime: GetBeaconTimeFn) =
var networkState = network.protocolState(BeaconSync)
networkState.dag = dag
networkState.getTime = getTime
networkState.getBeaconTime = getBeaconTime

View File

@ -170,10 +170,7 @@ proc sendAttestation*(
return case ok
of ValidationResult.Accept:
node.network.broadcast(
# TODO altair-transition
getAttestationTopic(node.dag.forkDigests.phase0, subnet_id),
attestation)
node.network.sendAttestation(subnet_id, attestation)
beacon_attestations_sent.inc()
true
else:
@ -182,21 +179,6 @@ proc sendAttestation*(
result = $ok
false
proc sendVoluntaryExit*(node: BeaconNode, exit: SignedVoluntaryExit) =
# TODO altair-transition
let exitsTopic = getVoluntaryExitsTopic(node.dag.forkDigests.phase0)
node.network.broadcast(exitsTopic, exit)
proc sendAttesterSlashing*(node: BeaconNode, slashing: AttesterSlashing) =
# TODO altair-transition
let attesterSlashingsTopic = getAttesterSlashingsTopic(node.dag.forkDigests.phase0)
node.network.broadcast(attesterSlashingsTopic, slashing)
proc sendProposerSlashing*(node: BeaconNode, slashing: ProposerSlashing) =
# TODO altair-transition
let proposerSlashingsTopic = getProposerSlashingsTopic(node.dag.forkDigests.phase0)
node.network.broadcast(proposerSlashingsTopic, slashing)
proc sendAttestation*(node: BeaconNode, attestation: Attestation): Future[bool] =
# For the validator API, which doesn't supply the subnet id.
let attestationBlck =
@ -523,7 +505,7 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
return head
proc broadcastAggregatedAttestations(
proc sendAggregatedAttestations(
node: BeaconNode, aggregationHead: BlockRef, aggregationSlot: Slot) {.async.} =
# The index is via a
# locally attested validator. Unlike in handleAttestations(...) there's a
@ -740,7 +722,10 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
aggregateWaitTime = shortLog(aggregateWaitTime.offset)
await sleepAsync(aggregateWaitTime.offset)
await broadcastAggregatedAttestations(node, head, slot)
let sendAggregatedAttestationsFut =
sendAggregatedAttestations(node, head, slot)
await sendAggregatedAttestationsFut
if node.eth1Monitor != nil and (slot mod SLOTS_PER_EPOCH) == 0:
let finalizedEpochRef = node.dag.getFinalizedEpochRef()

View File

@ -13,26 +13,26 @@ import ../beacon_chain/networking/peer_pool
import ./testutil
type
PeerTestID* = string
PeerTest* = object
PeerTestID = string
PeerTest = object
id: PeerTestID
weight: int
future: Future[void]
func getKey*(peer: PeerTest): PeerTestID =
func getKey(peer: PeerTest): PeerTestID =
peer.id
func getFuture*(peer: PeerTest): Future[void] =
func getFuture(peer: PeerTest): Future[void] =
peer.future
func `<`*(a, b: PeerTest): bool =
func `<`(a, b: PeerTest): bool =
`<`(a.weight, b.weight)
proc init*(t: typedesc[PeerTest], id: string = "",
weight: int = 0): PeerTest =
PeerTest(id: id, weight: weight, future: newFuture[void]())
proc close*(peer: PeerTest) =
proc close(peer: PeerTest) =
peer.future.complete()
suite "PeerPool testing suite":