diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 33c79a057..c690d1ad3 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -5,7 +5,7 @@ import # Status libs stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint, - faststreams/output_stream, snappy/framing, + faststreams/output_stream, snappy, snappy/framing, json_serialization, json_serialization/std/[net, options], chronos, chronicles, metrics, # TODO: create simpler to use libp2p modules that use re-exports @@ -1027,7 +1027,10 @@ proc subscribe*[MsgType](node: Eth2Node, trace "Incoming pubsub message received", peer = peerExpr, len = gossipBytes.len, topic = gossipTopic, message_id = `$`(sha256.digest(gossipBytes)) - msgHandler SSZ.decode(gossipBytes, MsgType) + if gossipTopic.endsWith("_snappy"): + msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType) + else: + msgHandler SSZ.decode(gossipBytes, MsgType) # All message types which are subscribed to should be validated; putting # this in subscribe(...) ensures that the default approach is correct. @@ -1035,7 +1038,10 @@ proc subscribe*[MsgType](node: Eth2Node, trace "Incoming pubsub message received for validation", len = gossipBytes.len, topic = gossipTopic, message_id = `$`(sha256.digest(gossipBytes)) - msgValidator SSZ.decode(gossipBytes, MsgType) + if gossipTopic.endsWith("_snappy"): + msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType) + else: + msgValidator SSZ.decode(gossipBytes, MsgType) # Validate messages as soon as subscribed let incomingMsgValidator = proc(topic: string, @@ -1044,12 +1050,17 @@ proc subscribe*[MsgType](node: Eth2Node, return execMsgValidator(message.data, topic) node.switch.addValidator(topic, incomingMsgValidator) + node.switch.addValidator(topic & "_snappy", incomingMsgValidator) let incomingMsgHandler = proc(topic: string, data: seq[byte]) {.async, gcsafe.} = execMsgHandler "unknown", data, topic - await node.switch.subscribe(topic, incomingMsgHandler) + var switchSubscriptions: seq[Future[void]] = @[] + switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler)) + switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandler)) + + waitFor allFutures(switchSubscriptions) proc traceMessage(fut: FutureBase, digest: MDigest[256]) = fut.addCallback do (arg: pointer): @@ -1062,6 +1073,11 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) = var fut = node.switch.publish(topic, broadcastBytes) traceMessage(fut, sha256.digest(broadcastBytes)) traceAsyncErrors(fut) + # also publish to the snappy-compressed topics + let snappy_encoded = snappy.encode(broadcastBytes) + var fut_snappy = node.switch.publish(topic & "_snappy", snappy_encoded) + traceMessage(fut_snappy, sha256.digest(snappy_encoded)) + traceAsyncErrors(fut_snappy) # TODO: # At the moment, this is just a compatiblity shim for the existing RLPx functionality.