merge topic sub/unsub infrastructure (#3099)

This commit is contained in:
tersec 2021-11-14 08:00:25 +00:00 committed by GitHub
parent 8cab7a74e7
commit 6f8eff8f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 8 deletions

View File

@ -619,8 +619,8 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
proc removePhase0MessageHandlers(node: BeaconNode) = proc removePhase0MessageHandlers(node: BeaconNode) =
removePhase0MessageHandlers(node, node.dag.forkDigests.phase0) removePhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) = proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.addPhase0MessageHandlers(node.dag.forkDigests.altair, slot) node.addPhase0MessageHandlers(forkDigest, slot)
var syncnets: SyncnetBits var syncnets: SyncnetBits
@ -629,26 +629,41 @@ proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
closureScope: closureScope:
let idx = committeeIdx let idx = committeeIdx
# TODO This should be done in dynamic way in trackSyncCommitteeTopics # TODO This should be done in dynamic way in trackSyncCommitteeTopics
node.network.subscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx), basicParams) node.network.subscribe(getSyncCommitteeTopic(forkDigest, idx), basicParams)
syncnets.setBit(idx.asInt) syncnets.setBit(idx.asInt)
node.network.subscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair), basicParams) node.network.subscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
node.network.updateSyncnetsMetadata(syncnets) node.network.updateSyncnetsMetadata(syncnets)
proc removeAltairMessageHandlers(node: BeaconNode) = proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
node.removePhase0MessageHandlers(node.dag.forkDigests.altair) addAltairMessageHandlers(node, node.dag.forkDigests.altair, slot)
proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest)
for committeeIdx in allSyncSubcommittees(): for committeeIdx in allSyncSubcommittees():
closureScope: closureScope:
let idx = committeeIdx let idx = committeeIdx
# TODO This should be done in dynamic way in trackSyncCommitteeTopics # TODO This should be done in dynamic way in trackSyncCommitteeTopics
node.network.unsubscribe(getSyncCommitteeTopic(node.dag.forkDigests.altair, idx)) node.network.unsubscribe(getSyncCommitteeTopic(forkDigest, idx))
node.network.unsubscribe(getSyncCommitteeContributionAndProofTopic(node.dag.forkDigests.altair)) node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest))
proc removeAltairMessageHandlers(node: BeaconNode) =
removeAltairMessageHandlers(node, node.dag.forkDigests.altair)
proc addMergeMessageHandlers(node: BeaconNode, slot: Slot) =
addAltairMessageHandlers(node, node.dag.forkDigests.merge, slot)
proc removeMergeMessageHandlers(node: BeaconNode) =
removeAltairMessageHandlers(node, node.dag.forkDigests.merge)
proc removeAllMessageHandlers(node: BeaconNode) = proc removeAllMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers() node.removePhase0MessageHandlers()
node.removeAltairMessageHandlers() node.removeAltairMessageHandlers()
node.removeMergeMessageHandlers()
proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) = proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
# When another client's already running, this is very likely to detect # When another client's already running, this is very likely to detect