mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-17 08:56:45 +00:00
remove waitFor in attestation subnet cycling
This commit is contained in:
parent
26ea76bbdf
commit
32a18769e6
@ -363,7 +363,8 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
|
|||||||
# finalization occurs every slot, to 4 slots vs scheduledSlot.
|
# finalization occurs every slot, to 4 slots vs scheduledSlot.
|
||||||
doAssert finalizedEpoch + 4 >= epoch
|
doAssert finalizedEpoch + 4 >= epoch
|
||||||
|
|
||||||
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) =
|
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8])
|
||||||
|
{.async} =
|
||||||
var attestationSubscriptions: seq[Future[void]] = @[]
|
var attestationSubscriptions: seq[Future[void]] = @[]
|
||||||
|
|
||||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestations-and-aggregation
|
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestations-and-aggregation
|
||||||
@ -371,14 +372,14 @@ proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) =
|
|||||||
attestationSubscriptions.add(node.network.subscribe(
|
attestationSubscriptions.add(node.network.subscribe(
|
||||||
getAttestationTopic(node.forkDigest, subnet)))
|
getAttestationTopic(node.forkDigest, subnet)))
|
||||||
|
|
||||||
waitFor allFutures(attestationSubscriptions)
|
await allFutures(attestationSubscriptions)
|
||||||
|
|
||||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata
|
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata
|
||||||
node.network.metadata.seq_number += 1
|
node.network.metadata.seq_number += 1
|
||||||
for subnet in subnets:
|
for subnet in subnets:
|
||||||
node.network.metadata.attnets[subnet] = true
|
node.network.metadata.attnets[subnet] = true
|
||||||
|
|
||||||
proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} =
|
||||||
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
|
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
|
||||||
|
|
||||||
let epochParity = slot.epoch mod 2
|
let epochParity = slot.epoch mod 2
|
||||||
@ -413,7 +414,7 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
|||||||
unsubscriptions.add(node.network.unsubscribe(
|
unsubscriptions.add(node.network.unsubscribe(
|
||||||
getAttestationTopic(node.forkDigest, expiringSubnet)))
|
getAttestationTopic(node.forkDigest, expiringSubnet)))
|
||||||
|
|
||||||
waitFor allFutures(unsubscriptions)
|
await allFutures(unsubscriptions)
|
||||||
|
|
||||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata
|
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata
|
||||||
# The race condition window is smaller by placing the fast, local, and
|
# The race condition window is smaller by placing the fast, local, and
|
||||||
@ -422,7 +423,7 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
|||||||
for expiringSubnet in expiringSubnets:
|
for expiringSubnet in expiringSubnets:
|
||||||
node.network.metadata.attnets[expiringSubnet] = false
|
node.network.metadata.attnets[expiringSubnet] = false
|
||||||
|
|
||||||
node.installAttestationSubnetHandlers(newSubnets)
|
await node.installAttestationSubnetHandlers(newSubnets)
|
||||||
|
|
||||||
block:
|
block:
|
||||||
let subscribed_subnets =
|
let subscribed_subnets =
|
||||||
@ -432,11 +433,10 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) =
|
|||||||
for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
|
for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
|
||||||
node.network.metadata.attnets[subnet] = subnet in subscribed_subnets
|
node.network.metadata.attnets[subnet] = subnet in subscribed_subnets
|
||||||
|
|
||||||
proc getAttestationHandlers(node: BeaconNode): Future[void] =
|
proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] =
|
||||||
var initialSubnets: set[uint8]
|
var initialSubnets: set[uint8]
|
||||||
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
|
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
|
||||||
initialSubnets.incl i
|
initialSubnets.incl i
|
||||||
node.installAttestationSubnetHandlers(initialSubnets)
|
|
||||||
|
|
||||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
|
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability
|
||||||
let wallEpoch = node.beaconClock.now().slotOrZero().epoch
|
let wallEpoch = node.beaconClock.now().slotOrZero().epoch
|
||||||
@ -449,7 +449,7 @@ proc getAttestationHandlers(node: BeaconNode): Future[void] =
|
|||||||
node.attestationSubnets.subscribedSubnets[0] = initialSubnets
|
node.attestationSubnets.subscribedSubnets[0] = initialSubnets
|
||||||
node.attestationSubnets.subscribedSubnets[1] = initialSubnets
|
node.attestationSubnets.subscribedSubnets[1] = initialSubnets
|
||||||
|
|
||||||
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest))
|
node.installAttestationSubnetHandlers(initialSubnets)
|
||||||
|
|
||||||
proc addMessageHandlers(node: BeaconNode): Future[void] =
|
proc addMessageHandlers(node: BeaconNode): Future[void] =
|
||||||
allFutures(
|
allFutures(
|
||||||
@ -458,8 +458,8 @@ proc addMessageHandlers(node: BeaconNode): Future[void] =
|
|||||||
node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest)),
|
node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest)),
|
||||||
node.network.subscribe(getProposerSlashingsTopic(node.forkDigest)),
|
node.network.subscribe(getProposerSlashingsTopic(node.forkDigest)),
|
||||||
node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest)),
|
node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest)),
|
||||||
|
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)),
|
||||||
node.getAttestationHandlers()
|
node.getAttestationSubnetHandlers()
|
||||||
)
|
)
|
||||||
|
|
||||||
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
|
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
|
||||||
@ -534,7 +534,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
|
|||||||
|
|
||||||
# Subscription or unsubscription might have occurred; recheck
|
# Subscription or unsubscription might have occurred; recheck
|
||||||
if slot.isEpoch and node.getTopicSubscriptionEnabled:
|
if slot.isEpoch and node.getTopicSubscriptionEnabled:
|
||||||
node.cycleAttestationSubnets(slot)
|
await node.cycleAttestationSubnets(slot)
|
||||||
|
|
||||||
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
|
||||||
## Called at the beginning of a slot - usually every slot, but sometimes might
|
## Called at the beginning of a slot - usually every slot, but sometimes might
|
||||||
|
Loading…
x
Reference in New Issue
Block a user