diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 9dcf56e30..a4c6015ad 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1591,11 +1591,11 @@ proc announcedENR*(node: Eth2Node): enr.Record = proc shortForm*(id: KeyPair): string = $PeerID.init(id.pubkey) -proc subscribe*(node: Eth2Node, topic: string) {.async.} = +proc subscribe*(node: Eth2Node, topic: string) = proc dummyMsgHandler(topic: string, data: seq[byte]) {.async.} = discard - await node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler) + node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler) proc addValidator*[MsgType](node: Eth2Node, topic: string, @@ -1625,8 +1625,8 @@ proc addValidator*[MsgType](node: Eth2Node, node.pubsub.addValidator(topic & "_snappy", execValidator) -proc unsubscribe*(node: Eth2Node, topic: string): Future[void] = - node.pubsub.unsubscribeAll(topic) +proc unsubscribe*(node: Eth2Node, topic: string) = + node.pubsub.unsubscribeAll(topic & "_snappy") proc traceMessage(fut: FutureBase, msgId: seq[byte]) = fut.addCallback do (arg: pointer): diff --git a/beacon_chain/inspector.nim b/beacon_chain/inspector.nim index 6b910aab2..0486034d2 100644 --- a/beacon_chain/inspector.nim +++ b/beacon_chain/inspector.nim @@ -720,11 +720,11 @@ proc run(conf: InspectorConf) {.async.} = try: for filter in topics: for topic in getTopics(forkDigest.get(), filter): - await pubsub.subscribe(topic, pubsubTrampoline) + pubsub.subscribe(topic, pubsubTrampoline) topicFilters.add(topic) trace "Subscribed to topic", topic = topic for filter in conf.customTopics: - await pubsub.subscribe(filter, pubsubTrampoline) + pubsub.subscribe(filter, pubsubTrampoline) topicFilters.add(filter) trace "Subscribed to custom topic", topic = filter except CatchableError as exc: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index b65cba86f..ddd62111c 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -7,7 +7,7 @@ import # Standard library - std/[os, tables, strutils, strformat, sequtils, times, math, + std/[os, tables, strutils, strformat, times, math, terminal, osproc, random], system/ansi_c, @@ -348,16 +348,10 @@ func verifyFinalization(node: BeaconNode, slot: Slot) = # finalization occurs every slot, to 4 slots vs scheduledSlot. doAssert finalizedEpoch + 4 >= epoch -proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) - {.async} = - var attestationSubscriptions: seq[Future[void]] = @[] - +proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestations-and-aggregation for subnet in subnets: - attestationSubscriptions.add(node.network.subscribe( - getAttestationTopic(node.forkDigest, subnet))) - - await allFutures(attestationSubscriptions) + node.network.subscribe(getAttestationTopic(node.forkDigest, subnet)) proc updateStabilitySubnetMetadata( node: BeaconNode, stabilitySubnets: set[uint8]) = @@ -381,7 +375,7 @@ func getStabilitySubnets(stabilitySubnets: auto): set[uint8] = for subnetInfo in stabilitySubnets: result.incl subnetInfo.subnet -proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = +proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) = static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 # Only know RANDAO mix, which determines shuffling seed, one epoch in @@ -435,21 +429,17 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = num_stability_subnets = node.attestationSubnets.stabilitySubnets.len block: - var unsubscriptions: seq[Future[void]] = @[] for expiringSubnet in expiringSubnets: - unsubscriptions.add(node.network.unsubscribe( - getAttestationTopic(node.forkDigest, expiringSubnet))) + node.network.unsubscribe(getAttestationTopic(node.forkDigest, expiringSubnet)) - await allFutures(unsubscriptions) - - await node.installAttestationSubnetHandlers(newSubnets) + node.installAttestationSubnetHandlers(newSubnets) let stabilitySubnets = getStabilitySubnets(node.attestationSubnets.stabilitySubnets) if stabilitySubnets != prevStabilitySubnets: node.updateStabilitySubnetMetadata(stabilitySubnets) -proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] = +proc getAttestationSubnetHandlers(node: BeaconNode) = var initialSubnets: set[uint8] for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: initialSubnets.incl i @@ -483,41 +473,35 @@ proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] = wallEpoch node.installAttestationSubnetHandlers(initialSubnets) -proc addMessageHandlers(node: BeaconNode): Future[void] = - allFutures( - # As a side-effect, this gets the attestation subnets too. - node.network.subscribe(node.topicBeaconBlocks), - node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest)), - node.network.subscribe(getProposerSlashingsTopic(node.forkDigest)), - node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest)), - node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)), - node.getAttestationSubnetHandlers() - ) +proc addMessageHandlers(node: BeaconNode) = + # As a side-effect, this gets the attestation subnets too. + node.network.subscribe(node.topicBeaconBlocks) + node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest)) + node.network.subscribe(getProposerSlashingsTopic(node.forkDigest)) + node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest)) + node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)) + node.getAttestationSubnetHandlers() + func getTopicSubscriptionEnabled(node: BeaconNode): bool = node.attestationSubnets.enabled -proc removeMessageHandlers(node: BeaconNode): Future[void] = +proc removeMessageHandlers(node: BeaconNode) = node.attestationSubnets.subscribedSubnets[0] = {} node.attestationSubnets.subscribedSubnets[1] = {} node.attestationSubnets.enabled = false doAssert not node.getTopicSubscriptionEnabled() - var unsubscriptions = mapIt( - [getBeaconBlocksTopic(node.forkDigest), - getVoluntaryExitsTopic(node.forkDigest), - getProposerSlashingsTopic(node.forkDigest), - getAttesterSlashingsTopic(node.forkDigest), - getAggregateAndProofsTopic(node.forkDigest)], - node.network.unsubscribe(it)) + node.network.unsubscribe(getBeaconBlocksTopic(node.forkDigest)) + node.network.unsubscribe(getVoluntaryExitsTopic(node.forkDigest)) + node.network.unsubscribe(getProposerSlashingsTopic(node.forkDigest)) + node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest)) + node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest)) for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: - unsubscriptions.add node.network.unsubscribe( - getAttestationTopic(node.forkDigest, subnet)) + node.network.unsubscribe(getAttestationTopic(node.forkDigest, subnet)) - allFutures(unsubscriptions) - -proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = +proc updateGossipStatus(node: BeaconNode, slot: Slot) = # Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p # subscribing to spin up. The faster the sync, the more wallSlot - headSlot # lead time is required @@ -550,7 +534,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = headSlot = node.chainDag.head.slot, syncQueueLen - await node.addMessageHandlers() + node.addMessageHandlers() doAssert node.getTopicSubscriptionEnabled() elif topicSubscriptionEnabled and @@ -562,14 +546,14 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = wallSlot = slot, headSlot = node.chainDag.head.slot, syncQueueLen - await node.removeMessageHandlers() + node.removeMessageHandlers() # Subscription or unsubscription might have occurred; recheck. if node.getTopicSubscriptionEnabled: # This exits early all but one call each epoch. - await node.cycleAttestationSubnets(slot) + node.cycleAttestationSubnets(slot) -proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] = +proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot) = # Things we do when slot processing has ended and we're about to wait for the # next slot @@ -657,7 +641,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = slot = wallSlot.slot # afterGenesis == true! nextSlot = slot + 1 - defer: await onSlotEnd(node, slot, nextSlot) + defer: onSlotEnd(node, slot, nextSlot) beacon_slot.set slot.int64 beacon_current_epoch.set slot.epoch.int64 @@ -865,7 +849,7 @@ proc run*(node: BeaconNode) = node.requestManager.start() node.startSyncManager() - waitFor node.addMessageHandlers() + node.addMessageHandlers() doAssert node.getTopicSubscriptionEnabled() ## Ctrl+C handling diff --git a/beacon_chain/rpc/validator_api.nim b/beacon_chain/rpc/validator_api.nim index 7b1549e5d..59801ccad 100644 --- a/beacon_chain/rpc/validator_api.nim +++ b/beacon_chain/rpc/validator_api.nim @@ -149,8 +149,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = get_committee_count_per_slot(epochRef), slot, committee_index).uint8 if subnet notin node.attestationSubnets.subscribedSubnets[0] and subnet notin node.attestationSubnets.subscribedSubnets[1]: - await node.network.subscribe(getAttestationTopic( - node.forkDigest, subnet)) + node.network.subscribe(getAttestationTopic(node.forkDigest, subnet)) # But it might only be in current node.attestationSubnets.subscribedSubnets[0].incl subnet diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index a1a5f9aba..5e79d3ab9 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit a1a5f9abaca41ec37ea2c4d6b34b5c4fcecf7591 +Subproject commit 5e79d3ab9c286ef6bc88049f3eaadd8b1ac02726