split subscribe into non-validating subscribe and addValidator (#1485)

* split subscribe into non-validating subscribe and addValidator

* stop exporting get_committee_assignments
This commit is contained in:
tersec 2020-08-11 15:08:44 +00:00 committed by GitHub
parent 224ebdfd72
commit 22c1ef5a8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 4 deletions

View File

@ -1226,8 +1226,7 @@ func peersCount*(node: Eth2Node): int =
proc subscribe*[MsgType](node: Eth2Node, proc subscribe*[MsgType](node: Eth2Node,
topic: string, topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.}, msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async, gcsafe.} =
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} =
proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
inc nbc_gossip_messages_received inc nbc_gossip_messages_received
trace "Incoming pubsub message received", trace "Incoming pubsub message received",
@ -1238,6 +1237,11 @@ proc subscribe*[MsgType](node: Eth2Node,
debug "Gossip msg handler error", debug "Gossip msg handler error",
msg = err.msg, len = data.len, topic, msgId = gossipId(data) msg = err.msg, len = data.len, topic, msgId = gossipId(data)
await node.switch.subscribe(topic & "_snappy", execMsgHandler)
proc addValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) =
# Validate messages as soon as subscribed # Validate messages as soon as subscribed
proc execValidator( proc execValidator(
topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} = topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} =
@ -1252,7 +1256,15 @@ proc subscribe*[MsgType](node: Eth2Node,
node.switch.addValidator(topic & "_snappy", execValidator) node.switch.addValidator(topic & "_snappy", execValidator)
await node.switch.subscribe(topic & "_snappy", execMsgHandler) proc subscribe*[MsgType](node: Eth2Node,
topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.},
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} =
node.addValidator(topic, msgValidator)
await node.subscribe(topic, msgHandler)
proc unsubscribe*(node: Eth2Node, topic: string): Future[void] =
node.switch.unsubscribeAll(topic)
proc traceMessage(fut: FutureBase, msgId: string) = proc traceMessage(fut: FutureBase, msgId: string) =
fut.addCallback do (arg: pointer): fut.addCallback do (arg: pointer):

View File

@ -92,7 +92,7 @@ func getAttestationTopic*(forkDigest: ForkDigest,
get_committee_count_per_slot(num_active_validators), get_committee_count_per_slot(num_active_validators),
attestation.data.slot, attestation.data.index.CommitteeIndex)) attestation.data.slot, attestation.data.index.CommitteeIndex))
func get_committee_assignments*( func get_committee_assignments(
state: BeaconState, epoch: Epoch, state: BeaconState, epoch: Epoch,
validator_indices: HashSet[ValidatorIndex]): validator_indices: HashSet[ValidatorIndex]):
seq[tuple[subnetIndex: uint64, slot: Slot]] = seq[tuple[subnetIndex: uint64, slot: Slot]] =