diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index d28999db0..ed980dc17 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1226,8 +1226,7 @@ func peersCount*(node: Eth2Node): int = proc subscribe*[MsgType](node: Eth2Node, topic: string, - msgHandler: proc(msg: MsgType) {.gcsafe.}, - msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} = + msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async, gcsafe.} = proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = inc nbc_gossip_messages_received trace "Incoming pubsub message received", @@ -1238,6 +1237,11 @@ proc subscribe*[MsgType](node: Eth2Node, debug "Gossip msg handler error", msg = err.msg, len = data.len, topic, msgId = gossipId(data) + await node.switch.subscribe(topic & "_snappy", execMsgHandler) + +proc addValidator*[MsgType](node: Eth2Node, + topic: string, + msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) = # Validate messages as soon as subscribed proc execValidator( topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} = @@ -1252,7 +1256,15 @@ proc subscribe*[MsgType](node: Eth2Node, node.switch.addValidator(topic & "_snappy", execValidator) - await node.switch.subscribe(topic & "_snappy", execMsgHandler) +proc subscribe*[MsgType](node: Eth2Node, + topic: string, + msgHandler: proc(msg: MsgType) {.gcsafe.}, + msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} = + node.addValidator(topic, msgValidator) + await node.subscribe(topic, msgHandler) + +proc unsubscribe*(node: Eth2Node, topic: string): Future[void] = + node.switch.unsubscribeAll(topic) proc traceMessage(fut: FutureBase, msgId: string) = fut.addCallback do (arg: pointer): diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index a65e2d6f0..8ffb3bf23 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -92,7 +92,7 @@ func getAttestationTopic*(forkDigest: ForkDigest, get_committee_count_per_slot(num_active_validators), attestation.data.slot, attestation.data.index.CommitteeIndex)) -func get_committee_assignments*( +func get_committee_assignments( state: BeaconState, epoch: Epoch, validator_indices: HashSet[ValidatorIndex]): seq[tuple[subnetIndex: uint64, slot: Slot]] =