broadcasting and listening to the '_snappy' topics as well
This commit is contained in:
parent
898df9ba45
commit
5ecba6df49
|
@ -5,7 +5,7 @@ import
|
||||||
|
|
||||||
# Status libs
|
# Status libs
|
||||||
stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint,
|
stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint,
|
||||||
faststreams/output_stream, snappy/framing,
|
faststreams/output_stream, snappy, snappy/framing,
|
||||||
json_serialization, json_serialization/std/[net, options],
|
json_serialization, json_serialization/std/[net, options],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
# TODO: create simpler to use libp2p modules that use re-exports
|
# TODO: create simpler to use libp2p modules that use re-exports
|
||||||
|
@ -1027,7 +1027,10 @@ proc subscribe*[MsgType](node: Eth2Node,
|
||||||
trace "Incoming pubsub message received",
|
trace "Incoming pubsub message received",
|
||||||
peer = peerExpr, len = gossipBytes.len, topic = gossipTopic,
|
peer = peerExpr, len = gossipBytes.len, topic = gossipTopic,
|
||||||
message_id = `$`(sha256.digest(gossipBytes))
|
message_id = `$`(sha256.digest(gossipBytes))
|
||||||
msgHandler SSZ.decode(gossipBytes, MsgType)
|
if gossipTopic.endsWith("_snappy"):
|
||||||
|
msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType)
|
||||||
|
else:
|
||||||
|
msgHandler SSZ.decode(gossipBytes, MsgType)
|
||||||
|
|
||||||
# All message types which are subscribed to should be validated; putting
|
# All message types which are subscribed to should be validated; putting
|
||||||
# this in subscribe(...) ensures that the default approach is correct.
|
# this in subscribe(...) ensures that the default approach is correct.
|
||||||
|
@ -1035,7 +1038,10 @@ proc subscribe*[MsgType](node: Eth2Node,
|
||||||
trace "Incoming pubsub message received for validation",
|
trace "Incoming pubsub message received for validation",
|
||||||
len = gossipBytes.len, topic = gossipTopic,
|
len = gossipBytes.len, topic = gossipTopic,
|
||||||
message_id = `$`(sha256.digest(gossipBytes))
|
message_id = `$`(sha256.digest(gossipBytes))
|
||||||
msgValidator SSZ.decode(gossipBytes, MsgType)
|
if gossipTopic.endsWith("_snappy"):
|
||||||
|
msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType)
|
||||||
|
else:
|
||||||
|
msgValidator SSZ.decode(gossipBytes, MsgType)
|
||||||
|
|
||||||
# Validate messages as soon as subscribed
|
# Validate messages as soon as subscribed
|
||||||
let incomingMsgValidator = proc(topic: string,
|
let incomingMsgValidator = proc(topic: string,
|
||||||
|
@ -1044,12 +1050,17 @@ proc subscribe*[MsgType](node: Eth2Node,
|
||||||
return execMsgValidator(message.data, topic)
|
return execMsgValidator(message.data, topic)
|
||||||
|
|
||||||
node.switch.addValidator(topic, incomingMsgValidator)
|
node.switch.addValidator(topic, incomingMsgValidator)
|
||||||
|
node.switch.addValidator(topic & "_snappy", incomingMsgValidator)
|
||||||
|
|
||||||
let incomingMsgHandler = proc(topic: string,
|
let incomingMsgHandler = proc(topic: string,
|
||||||
data: seq[byte]) {.async, gcsafe.} =
|
data: seq[byte]) {.async, gcsafe.} =
|
||||||
execMsgHandler "unknown", data, topic
|
execMsgHandler "unknown", data, topic
|
||||||
|
|
||||||
await node.switch.subscribe(topic, incomingMsgHandler)
|
var switchSubscriptions: seq[Future[void]] = @[]
|
||||||
|
switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler))
|
||||||
|
switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandler))
|
||||||
|
|
||||||
|
waitFor allFutures(switchSubscriptions)
|
||||||
|
|
||||||
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
|
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
|
||||||
fut.addCallback do (arg: pointer):
|
fut.addCallback do (arg: pointer):
|
||||||
|
@ -1062,6 +1073,11 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
|
||||||
var fut = node.switch.publish(topic, broadcastBytes)
|
var fut = node.switch.publish(topic, broadcastBytes)
|
||||||
traceMessage(fut, sha256.digest(broadcastBytes))
|
traceMessage(fut, sha256.digest(broadcastBytes))
|
||||||
traceAsyncErrors(fut)
|
traceAsyncErrors(fut)
|
||||||
|
# also publish to the snappy-compressed topics
|
||||||
|
let snappy_encoded = snappy.encode(broadcastBytes)
|
||||||
|
var fut_snappy = node.switch.publish(topic & "_snappy", snappy_encoded)
|
||||||
|
traceMessage(fut_snappy, sha256.digest(snappy_encoded))
|
||||||
|
traceAsyncErrors(fut_snappy)
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
|
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
|
||||||
|
|
Loading…
Reference in New Issue