refactor topic (un)subscribing/validating to collate each (#1510)

* refactor topic (un)subscribing/validating to collate each

* fix comment

* tweak comment
This commit is contained in:
tersec 2020-08-17 12:07:29 +00:00 committed by GitHub
parent ed9bec0147
commit 612881b95d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 64 deletions

View File

@ -833,16 +833,7 @@ proc installRpcHandlers(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.installBeaconApiHandlers(node)
rpcServer.installDebugApiHandlers(node)
proc installAttestationHandlers(node: BeaconNode) =
proc attestationHandler(attestation: Attestation) =
# Avoid double-counting attestation-topic attestations on shared codepath
# when they're reflected through beacon blocks
beacon_attestations_received.inc()
beacon_attestation_received_seconds_from_slot_start.observe(
node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64)
node.onAttestation(attestation)
proc installMessageValidators(node: BeaconNode) =
proc attestationValidator(attestation: Attestation,
committeeIndex: uint64): bool =
let (afterGenesis, slot) = node.beaconClock.now().toSlot()
@ -870,6 +861,56 @@ proc installAttestationHandlers(node: BeaconNode) =
attestationValidator(attestation, ci)
)
node.network.addValidator(
getAggregateAndProofsTopic(node.forkDigest),
proc(signedAggregateAndProof: SignedAggregateAndProof): bool =
aggregatedAttestationValidator(signedAggregateAndProof))
node.network.addValidator(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock) -> bool:
let
now = node.beaconClock.now
(afterGenesis, slot) = now.toSlot()
if not afterGenesis:
return false
logScope:
blk = shortLog(signedBlock.message)
root = shortLog(signedBlock.root)
let isKnown = signedBlock.root in node.chainDag.blocks
if isKnown:
trace "Received known gossip block"
# TODO:
# Potentially use a fast exit here. We only need to check that
# the contents of the incoming message match our previously seen
# version of the block. We don't need to use HTR for this - for
# better efficiency we can use vanilla SHA256 or direct comparison
# if we still have the previous block in memory.
# TODO:
# We are seeing extreme delays sometimes (e.g. 300 seconds).
# Should we drop such blocks? The spec doesn't set a policy on this.
else:
let delay = (now.int64 - signedBlock.message.slot.toBeaconTime.int64)
debug "Incoming gossip block", delay
beacon_block_received_seconds_from_slot_start.observe delay
let blck = node.chainDag.isValidBeaconBlock(node.quarantine,
signedBlock, slot, {})
node.dumpBlock(signedBlock, blck)
blck.isOk
proc getAttestationHandlers(node: BeaconNode): Future[void] =
proc attestationHandler(attestation: Attestation) =
# Avoid double-counting attestation-topic attestations on shared codepath
# when they're reflected through beacon blocks
beacon_attestations_received.inc()
beacon_attestation_received_seconds_from_slot_start.observe(
node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64)
node.onAttestation(attestation)
var initialSubnets: set[uint8]
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
initialSubnets.incl i
@ -881,15 +922,40 @@ proc installAttestationHandlers(node: BeaconNode) =
GENESIS_EPOCH + getStabilitySubnetLength()
# Relative to epoch 0, this sets the "current" attestation subnets.
# TODO can't assume GENESIS_EPOCH after gossip turned off/on
node.attestationSubnets.subscribedSubnets[1 - (GENESIS_EPOCH mod 2)] =
initialSubnets
waitFor node.network.subscribe(
node.network.subscribe(
getAggregateAndProofsTopic(node.forkDigest),
proc(signedAggregateAndProof: SignedAggregateAndProof) =
attestationHandler(signedAggregateAndProof.message.aggregate),
proc(signedAggregateAndProof: SignedAggregateAndProof): bool =
aggregatedAttestationValidator(signedAggregateAndProof))
attestationHandler(signedAggregateAndProof.message.aggregate))
proc addMessageHandlers(node: BeaconNode) =
waitFor allFutures(
# As a side-effect, this gets the attestation subnets too.
node.network.subscribe(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock):
onBeaconBlock(node, signedBlock),
node.getAttestationHandlers()
)
proc removeMessageHandlers(node: BeaconNode) =
var unsubscriptions: seq[Future[void]]
for topic in [
getBeaconBlocksTopic(node.forkDigest),
getVoluntaryExitsTopic(node.forkDigest),
getProposerSlashingsTopic(node.forkDigest),
getAttesterSlashingsTopic(node.forkDigest),
getAggregateAndProofsTopic(node.forkDigest)]:
unsubscriptions.add node.network.unsubscribe(topic)
for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
unsubscriptions.add node.network.unsubscribe(
getAttestationTopic(node.forkDigest, subnet))
waitFor allFutures(unsubscriptions)
proc stop*(node: BeaconNode) =
status = BeaconNodeStatus.Stopping
@ -905,44 +971,8 @@ proc run*(node: BeaconNode) =
node.rpcServer.installRpcHandlers(node)
node.rpcServer.start()
waitFor node.network.subscribe(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock):
onBeaconBlock(node, signedBlock)
do (signedBlock: SignedBeaconBlock) -> bool:
let
now = node.beaconClock.now
(afterGenesis, slot) = now.toSlot()
if not afterGenesis:
return false
logScope:
blk = shortLog(signedBlock.message)
root = shortLog(signedBlock.root)
let isKnown = signedBlock.root in node.chainDag.blocks
if isKnown:
trace "Received known gossip block"
# TODO:
# Potentially use a fast exit here. We only need to check that
# the contents of the incoming message match our previously seen
# version of the block. We don't need to use HTR for this - for
# better efficiency we can use vanilla SHA256 or direct comparison
# if we still have the previous block in memory.
# TODO:
# We are seeing extreme delays sometimes (e.g. 300 seconds).
# Should we drop such blocks? The spec doesn't set a policy on this.
else:
let delay = (now.int64 - signedBlock.message.slot.toBeaconTime.int64)
debug "Incoming gossip block", delay
beacon_block_received_seconds_from_slot_start.observe delay
let blck = node.chainDag.isValidBeaconBlock(node.quarantine,
signedBlock, slot, {})
node.dumpBlock(signedBlock, blck)
blck.isOk
installAttestationHandlers(node)
node.installMessageValidators()
node.addMessageHandlers()
let
curSlot = node.beaconClock.now().slotOrZero()

View File

@ -1264,13 +1264,6 @@ proc addValidator*[MsgType](node: Eth2Node,
node.switch.addValidator(topic & "_snappy", execValidator)
proc subscribe*[MsgType](node: Eth2Node,
topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.},
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} =
node.addValidator(topic, msgValidator)
await node.subscribe(topic, msgHandler)
proc unsubscribe*(node: Eth2Node, topic: string): Future[void] =
node.switch.unsubscribeAll(topic)

View File

@ -75,7 +75,7 @@ func compute_subnet_for_attestation*(
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#broadcast-attestation
func getAttestationTopic*(forkDigest: ForkDigest, subnetIndex: uint64):
string =
# This is for subscribing or broadcasting manually to a known index.
## For subscribing and unsubscribing to/from a subnet.
doAssert subnetIndex < ATTESTATION_SUBNET_COUNT
try:

View File

@ -259,14 +259,13 @@ func get_finality_delay(state: BeaconState): uint64 =
func is_in_inactivity_leak(state: BeaconState): bool =
get_finality_delay(state) > MIN_EPOCHS_TO_INACTIVITY_PENALTY
func get_eligible_validator_indices(state: BeaconState): seq[ValidatorIndex] =
# TODO iterator/yield, also, probably iterates multiple times over epoch
# transitions
iterator get_eligible_validator_indices(state: BeaconState): ValidatorIndex =
# TODO probably iterates multiple times over epoch transitions
let previous_epoch = get_previous_epoch(state)
for idx, v in state.validators:
if is_active_validator(v, previous_epoch) or
(v.slashed and previous_epoch + 1 < v.withdrawable_epoch):
result.add idx.ValidatorIndex
yield idx.ValidatorIndex
func get_attestation_component_deltas(state: BeaconState,
attestations: seq[PendingAttestation],