remove async from sub/unsub (#2197)

* remove await/async from sub/unsub

* fix unsubscribe wrong key (missed _snappy)

* use the right libp2p commit hash

* remove unused async

* fix inspector

* fix subnet calculation in RPC and insert broadcast attestations into node's pool

* unify codepaths to ensure only mostly-checked-to-be-valid attestations enter the pool, even from node's own broadcasts

* update attestation pool tests for new validateAttestation param

Co-authored-by: Dustin Brody <tersec@users.noreply.github.com>
This commit is contained in:
Giovanni Petrantoni 2020-12-24 17:48:52 +09:00 committed by GitHub
parent 97c4f7c5c0
commit ed24f60f70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 38 additions and 55 deletions

View File

@ -1591,11 +1591,11 @@ proc announcedENR*(node: Eth2Node): enr.Record =
proc shortForm*(id: KeyPair): string =
$PeerID.init(id.pubkey)
proc subscribe*(node: Eth2Node, topic: string) {.async.} =
proc subscribe*(node: Eth2Node, topic: string) =
proc dummyMsgHandler(topic: string, data: seq[byte]) {.async.} =
discard
await node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler)
node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler)
proc addValidator*[MsgType](node: Eth2Node,
topic: string,
@ -1625,8 +1625,8 @@ proc addValidator*[MsgType](node: Eth2Node,
node.pubsub.addValidator(topic & "_snappy", execValidator)
proc unsubscribe*(node: Eth2Node, topic: string): Future[void] =
node.pubsub.unsubscribeAll(topic)
proc unsubscribe*(node: Eth2Node, topic: string) =
node.pubsub.unsubscribeAll(topic & "_snappy")
proc traceMessage(fut: FutureBase, msgId: seq[byte]) =
fut.addCallback do (arg: pointer):

View File

@ -720,11 +720,11 @@ proc run(conf: InspectorConf) {.async.} =
try:
for filter in topics:
for topic in getTopics(forkDigest.get(), filter):
await pubsub.subscribe(topic, pubsubTrampoline)
pubsub.subscribe(topic, pubsubTrampoline)
topicFilters.add(topic)
trace "Subscribed to topic", topic = topic
for filter in conf.customTopics:
await pubsub.subscribe(filter, pubsubTrampoline)
pubsub.subscribe(filter, pubsubTrampoline)
topicFilters.add(filter)
trace "Subscribed to custom topic", topic = filter
except CatchableError as exc:

View File

@ -7,7 +7,7 @@
import
# Standard library
std/[os, tables, strutils, strformat, sequtils, times, math,
std/[os, tables, strutils, strformat, times, math,
terminal, osproc, random],
system/ansi_c,
@ -348,16 +348,10 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8])
{.async} =
var attestationSubscriptions: seq[Future[void]] = @[]
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#attestations-and-aggregation
for subnet in subnets:
attestationSubscriptions.add(node.network.subscribe(
getAttestationTopic(node.forkDigest, subnet)))
await allFutures(attestationSubscriptions)
node.network.subscribe(getAttestationTopic(node.forkDigest, subnet))
proc updateStabilitySubnetMetadata(
node: BeaconNode, stabilitySubnets: set[uint8]) =
@ -381,7 +375,7 @@ func getStabilitySubnets(stabilitySubnets: auto): set[uint8] =
for subnetInfo in stabilitySubnets:
result.incl subnetInfo.subnet
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
# Only know RANDAO mix, which determines shuffling seed, one epoch in
@ -435,21 +429,17 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
num_stability_subnets = node.attestationSubnets.stabilitySubnets.len
block:
var unsubscriptions: seq[Future[void]] = @[]
for expiringSubnet in expiringSubnets:
unsubscriptions.add(node.network.unsubscribe(
getAttestationTopic(node.forkDigest, expiringSubnet)))
node.network.unsubscribe(getAttestationTopic(node.forkDigest, expiringSubnet))
await allFutures(unsubscriptions)
await node.installAttestationSubnetHandlers(newSubnets)
node.installAttestationSubnetHandlers(newSubnets)
let stabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets)
if stabilitySubnets != prevStabilitySubnets:
node.updateStabilitySubnetMetadata(stabilitySubnets)
proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] =
proc getAttestationSubnetHandlers(node: BeaconNode) =
var initialSubnets: set[uint8]
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
initialSubnets.incl i
@ -483,41 +473,35 @@ proc getAttestationSubnetHandlers(node: BeaconNode): Future[void] =
wallEpoch
node.installAttestationSubnetHandlers(initialSubnets)
proc addMessageHandlers(node: BeaconNode): Future[void] =
allFutures(
proc addMessageHandlers(node: BeaconNode) =
# As a side-effect, this gets the attestation subnets too.
node.network.subscribe(node.topicBeaconBlocks),
node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest)),
node.network.subscribe(getProposerSlashingsTopic(node.forkDigest)),
node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest)),
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest)),
node.network.subscribe(node.topicBeaconBlocks)
node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest))
node.network.subscribe(getProposerSlashingsTopic(node.forkDigest))
node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest))
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest))
node.getAttestationSubnetHandlers()
)
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled
proc removeMessageHandlers(node: BeaconNode): Future[void] =
proc removeMessageHandlers(node: BeaconNode) =
node.attestationSubnets.subscribedSubnets[0] = {}
node.attestationSubnets.subscribedSubnets[1] = {}
node.attestationSubnets.enabled = false
doAssert not node.getTopicSubscriptionEnabled()
var unsubscriptions = mapIt(
[getBeaconBlocksTopic(node.forkDigest),
getVoluntaryExitsTopic(node.forkDigest),
getProposerSlashingsTopic(node.forkDigest),
getAttesterSlashingsTopic(node.forkDigest),
getAggregateAndProofsTopic(node.forkDigest)],
node.network.unsubscribe(it))
node.network.unsubscribe(getBeaconBlocksTopic(node.forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(node.forkDigest))
node.network.unsubscribe(getProposerSlashingsTopic(node.forkDigest))
node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest))
node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest))
for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
unsubscriptions.add node.network.unsubscribe(
getAttestationTopic(node.forkDigest, subnet))
node.network.unsubscribe(getAttestationTopic(node.forkDigest, subnet))
allFutures(unsubscriptions)
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
proc updateGossipStatus(node: BeaconNode, slot: Slot) =
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
# lead time is required
@ -550,7 +534,7 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
headSlot = node.chainDag.head.slot,
syncQueueLen
await node.addMessageHandlers()
node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled()
elif
topicSubscriptionEnabled and
@ -562,14 +546,14 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
wallSlot = slot,
headSlot = node.chainDag.head.slot,
syncQueueLen
await node.removeMessageHandlers()
node.removeMessageHandlers()
# Subscription or unsubscription might have occurred; recheck.
if node.getTopicSubscriptionEnabled:
# This exits early all but one call each epoch.
await node.cycleAttestationSubnets(slot)
node.cycleAttestationSubnets(slot)
proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] =
proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot) =
# Things we do when slot processing has ended and we're about to wait for the
# next slot
@ -657,7 +641,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
slot = wallSlot.slot # afterGenesis == true!
nextSlot = slot + 1
defer: await onSlotEnd(node, slot, nextSlot)
defer: onSlotEnd(node, slot, nextSlot)
beacon_slot.set slot.int64
beacon_current_epoch.set slot.epoch.int64
@ -865,7 +849,7 @@ proc run*(node: BeaconNode) =
node.requestManager.start()
node.startSyncManager()
waitFor node.addMessageHandlers()
node.addMessageHandlers()
doAssert node.getTopicSubscriptionEnabled()
## Ctrl+C handling

View File

@ -149,8 +149,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
get_committee_count_per_slot(epochRef), slot, committee_index).uint8
if subnet notin node.attestationSubnets.subscribedSubnets[0] and
subnet notin node.attestationSubnets.subscribedSubnets[1]:
await node.network.subscribe(getAttestationTopic(
node.forkDigest, subnet))
node.network.subscribe(getAttestationTopic(node.forkDigest, subnet))
# But it might only be in current
node.attestationSubnets.subscribedSubnets[0].incl subnet

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit a1a5f9abaca41ec37ea2c4d6b34b5c4fcecf7591
Subproject commit 5e79d3ab9c286ef6bc88049f3eaadd8b1ac02726