diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index dfd5d642b..f3609cd6d 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -363,7 +363,8 @@ func verifyFinalization(node: BeaconNode, slot: Slot) = # finalization occurs every slot, to 4 slots vs scheduledSlot. doAssert finalizedEpoch + 4 >= epoch -proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) = +proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) + {.async} = var attestationSubscriptions: seq[Future[void]] = @[] # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestations-and-aggregation @@ -371,14 +372,14 @@ proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) = attestationSubscriptions.add(node.network.subscribe( getAttestationTopic(node.forkDigest, subnet))) - waitFor allFutures(attestationSubscriptions) + await allFutures(attestationSubscriptions) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata node.network.metadata.seq_number += 1 for subnet in subnets: node.network.metadata.attnets[subnet] = true -proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) = +proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) {.async.} = static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 let epochParity = slot.epoch mod 2 @@ -413,7 +414,7 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) = unsubscriptions.add(node.network.unsubscribe( getAttestationTopic(node.forkDigest, expiringSubnet))) - waitFor allFutures(unsubscriptions) + await allFutures(unsubscriptions) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#metadata # The race condition window is smaller by placing the fast, local, and @@ -422,7 +423,7 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) = for expiringSubnet in expiringSubnets: node.network.metadata.attnets[expiringSubnet] = false - node.installAttestationSubnetHandlers(newSubnets) + await node.installAttestationSubnetHandlers(newSubnets) block: let subscribed_subnets = @@ -432,11 +433,10 @@ proc cycleAttestationSubnets(node: BeaconNode, slot: Slot) = for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT: node.network.metadata.attnets[subnet] = subnet in subscribed_subnets -proc getAttestationHandlers(node: BeaconNode): Future[void] = +proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] = var initialSubnets: set[uint8] for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: initialSubnets.incl i - node.installAttestationSubnetHandlers(initialSubnets) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/validator.md#phase-0-attestation-subnet-stability let wallEpoch = node.beaconClock.now().slotOrZero().epoch @@ -449,7 +449,7 @@ proc getAttestationHandlers(node: BeaconNode): Future[void] = node.attestationSubnets.subscribedSubnets[0] = initialSubnets node.attestationSubnets.subscribedSubnets[1] = initialSubnets - node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)) + node.installAttestationSubnetHandlers(initialSubnets) proc addMessageHandlers(node: BeaconNode): Future[void] = allFutures( @@ -458,8 +458,8 @@ proc addMessageHandlers(node: BeaconNode): Future[void] = node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest)), node.network.subscribe(getProposerSlashingsTopic(node.forkDigest)), node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest)), - - node.getAttestationHandlers() + node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)), + node.getAttestationSubnetHandlers() ) func getTopicSubscriptionEnabled(node: BeaconNode): bool = @@ -534,7 +534,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = # Subscription or unsubscription might have occurred; recheck if slot.isEpoch and node.getTopicSubscriptionEnabled: - node.cycleAttestationSubnets(slot) + await node.cycleAttestationSubnets(slot) proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might