From eeccaaf16d4a5ef97e105ba898b3647696d0bad2 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 27 Jun 2020 12:16:43 +0200 Subject: [PATCH] stop gossipping non-snappy (#1240) * stop gossipping non-snappy Also simplify subscription and actually handle decoding errors * log weird states too --- beacon_chain/eth2_network.nim | 76 +++++++++++++---------------------- 1 file changed, 27 insertions(+), 49 deletions(-) diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index b81c981ab..cdc88de46 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -1134,69 +1134,47 @@ proc subscribe*[MsgType](node: Eth2Node, topic: string, msgHandler: proc(msg: MsgType) {.gcsafe.}, msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} = - template execMsgHandler(peerExpr, gossipBytes, gossipTopic, useSnappy) = + proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = inc nbc_gossip_messages_received trace "Incoming pubsub message received", - peer = peerExpr, len = gossipBytes.len, topic = gossipTopic, - message_id = `$`(sha256.digest(gossipBytes)) - when useSnappy: - msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType) - else: - msgHandler SSZ.decode(gossipBytes, MsgType) - - # All message types which are subscribed to should be validated; putting - # this in subscribe(...) ensures that the default approach is correct. - template execMsgValidator(gossipBytes, gossipTopic, useSnappy): bool = - trace "Incoming pubsub message received for validation", - len = gossipBytes.len, topic = gossipTopic, - message_id = `$`(sha256.digest(gossipBytes)) - when useSnappy: - msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType) - else: - msgValidator SSZ.decode(gossipBytes, MsgType) + len = data.len, topic, message_id = `$`(sha256.digest(data)) + try: + msgHandler SSZ.decode(snappy.decode(data), MsgType) + except CatchableError as err: + debug "Gossip msg handler error", + msg = err.msg, len = data.len, topic, + message_id = `$`(sha256.digest(data)) # Validate messages as soon as subscribed - let incomingMsgValidator = proc(topic: string, - message: GossipMsg): Future[bool] - {.async, gcsafe.} = - return execMsgValidator(message.data, topic, false) - let incomingMsgValidatorSnappy = proc(topic: string, - message: GossipMsg): Future[bool] - {.async, gcsafe.} = - return execMsgValidator(message.data, topic, true) + proc execValidator( + topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} = + trace "Validating incoming gossip message", + len = message.data.len, topic, message_id = `$`(sha256.digest(message.data)) + try: + return msgValidator SSZ.decode(snappy.decode(message.data), MsgType) + except CatchableError as err: + debug "Gossip validation error", msg = err.msg + return false - node.switch.addValidator(topic, incomingMsgValidator) - node.switch.addValidator(topic & "_snappy", incomingMsgValidatorSnappy) + node.switch.addValidator(topic & "_snappy", execValidator) - let incomingMsgHandler = proc(topic: string, - data: seq[byte]) {.async, gcsafe.} = - execMsgHandler "unknown", data, topic, false - let incomingMsgHandlerSnappy = proc(topic: string, - data: seq[byte]) {.async, gcsafe.} = - execMsgHandler "unknown", data, topic, true - - var switchSubscriptions: seq[Future[void]] = @[] - switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler)) - switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandlerSnappy)) - - await allFutures(switchSubscriptions) + await node.switch.subscribe(topic & "_snappy", execMsgHandler) proc traceMessage(fut: FutureBase, digest: MDigest[256]) = fut.addCallback do (arg: pointer): if not(fut.failed): trace "Outgoing pubsub message sent", message_id = `$`(digest) + elif fut.error != nil: + debug "Gossip message not sent", msg = fut.error.msg + else: + debug "Unexpected future state for gossip", state = fut.state proc broadcast*(node: Eth2Node, topic: string, msg: auto) = inc nbc_gossip_messages_sent - let broadcastBytes = SSZ.encode(msg) - var fut = node.switch.publish(topic, broadcastBytes) - traceMessage(fut, sha256.digest(broadcastBytes)) - traceAsyncErrors(fut) - # also publish to the snappy-compressed topics - let snappyEncoded = snappy.encode(broadcastBytes) - var futSnappy = node.switch.publish(topic & "_snappy", snappyEncoded) - traceMessage(futSnappy, sha256.digest(snappyEncoded)) - traceAsyncErrors(futSnappy) + let + data = snappy.encode(SSZ.encode(msg)) + var futSnappy = node.switch.publish(topic & "_snappy", data) + traceMessage(futSnappy, sha256.digest(data)) # TODO: # At the moment, this is just a compatiblity shim for the existing RLPx functionality.