diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index c690d1ad3..4908bf4b3 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1022,23 +1022,23 @@ proc subscribe*[MsgType](node: Eth2Node, topic: string, msgHandler: proc(msg: MsgType) {.gcsafe.}, msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} = - template execMsgHandler(peerExpr, gossipBytes, gossipTopic) = + template execMsgHandler(peerExpr, gossipBytes, gossipTopic, useSnappy) = inc gossip_messages_received trace "Incoming pubsub message received", peer = peerExpr, len = gossipBytes.len, topic = gossipTopic, message_id = `$`(sha256.digest(gossipBytes)) - if gossipTopic.endsWith("_snappy"): + if useSnappy: msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType) else: msgHandler SSZ.decode(gossipBytes, MsgType) # All message types which are subscribed to should be validated; putting # this in subscribe(...) ensures that the default approach is correct. - template execMsgValidator(gossipBytes, gossipTopic): bool = + template execMsgValidator(gossipBytes, gossipTopic, useSnappy): bool = trace "Incoming pubsub message received for validation", len = gossipBytes.len, topic = gossipTopic, message_id = `$`(sha256.digest(gossipBytes)) - if gossipTopic.endsWith("_snappy"): + if useSnappy: msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType) else: msgValidator SSZ.decode(gossipBytes, MsgType) @@ -1047,20 +1047,27 @@ proc subscribe*[MsgType](node: Eth2Node, let incomingMsgValidator = proc(topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} = - return execMsgValidator(message.data, topic) + return execMsgValidator(message.data, topic, false) + let incomingMsgValidatorSnappy = proc(topic: string, + message: GossipMsg): Future[bool] + {.async, gcsafe.} = + return execMsgValidator(message.data, topic, true) node.switch.addValidator(topic, incomingMsgValidator) - node.switch.addValidator(topic & "_snappy", incomingMsgValidator) + node.switch.addValidator(topic & "_snappy", incomingMsgValidatorSnappy) let incomingMsgHandler = proc(topic: string, data: seq[byte]) {.async, gcsafe.} = - execMsgHandler "unknown", data, topic + execMsgHandler "unknown", data, topic, false + let incomingMsgHandlerSnappy = proc(topic: string, + data: seq[byte]) {.async, gcsafe.} = + execMsgHandler "unknown", data, topic, true var switchSubscriptions: seq[Future[void]] = @[] switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler)) - switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandler)) + switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandlerSnappy)) - waitFor allFutures(switchSubscriptions) + await allFutures(switchSubscriptions) proc traceMessage(fut: FutureBase, digest: MDigest[256]) = fut.addCallback do (arg: pointer): @@ -1074,10 +1081,10 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) = traceMessage(fut, sha256.digest(broadcastBytes)) traceAsyncErrors(fut) # also publish to the snappy-compressed topics - let snappy_encoded = snappy.encode(broadcastBytes) - var fut_snappy = node.switch.publish(topic & "_snappy", snappy_encoded) - traceMessage(fut_snappy, sha256.digest(snappy_encoded)) - traceAsyncErrors(fut_snappy) + let snappyEncoded = snappy.encode(broadcastBytes) + var futSnappy = node.switch.publish(topic & "_snappy", snappyEncoded) + traceMessage(futSnappy, sha256.digest(snappyEncoded)) + traceAsyncErrors(futSnappy) # TODO: # At the moment, this is just a compatiblity shim for the existing RLPx functionality.