stop gossipping non-snappy (#1240)
* stop gossipping non-snappy Also simplify subscription and actually handle decoding errors * log weird states too
This commit is contained in:
parent
c0c1a7164b
commit
eeccaaf16d
|
@ -1134,69 +1134,47 @@ proc subscribe*[MsgType](node: Eth2Node,
|
||||||
topic: string,
|
topic: string,
|
||||||
msgHandler: proc(msg: MsgType) {.gcsafe.},
|
msgHandler: proc(msg: MsgType) {.gcsafe.},
|
||||||
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} =
|
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} =
|
||||||
template execMsgHandler(peerExpr, gossipBytes, gossipTopic, useSnappy) =
|
proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
inc nbc_gossip_messages_received
|
inc nbc_gossip_messages_received
|
||||||
trace "Incoming pubsub message received",
|
trace "Incoming pubsub message received",
|
||||||
peer = peerExpr, len = gossipBytes.len, topic = gossipTopic,
|
len = data.len, topic, message_id = `$`(sha256.digest(data))
|
||||||
message_id = `$`(sha256.digest(gossipBytes))
|
try:
|
||||||
when useSnappy:
|
msgHandler SSZ.decode(snappy.decode(data), MsgType)
|
||||||
msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType)
|
except CatchableError as err:
|
||||||
else:
|
debug "Gossip msg handler error",
|
||||||
msgHandler SSZ.decode(gossipBytes, MsgType)
|
msg = err.msg, len = data.len, topic,
|
||||||
|
message_id = `$`(sha256.digest(data))
|
||||||
# All message types which are subscribed to should be validated; putting
|
|
||||||
# this in subscribe(...) ensures that the default approach is correct.
|
|
||||||
template execMsgValidator(gossipBytes, gossipTopic, useSnappy): bool =
|
|
||||||
trace "Incoming pubsub message received for validation",
|
|
||||||
len = gossipBytes.len, topic = gossipTopic,
|
|
||||||
message_id = `$`(sha256.digest(gossipBytes))
|
|
||||||
when useSnappy:
|
|
||||||
msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType)
|
|
||||||
else:
|
|
||||||
msgValidator SSZ.decode(gossipBytes, MsgType)
|
|
||||||
|
|
||||||
# Validate messages as soon as subscribed
|
# Validate messages as soon as subscribed
|
||||||
let incomingMsgValidator = proc(topic: string,
|
proc execValidator(
|
||||||
message: GossipMsg): Future[bool]
|
topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} =
|
||||||
{.async, gcsafe.} =
|
trace "Validating incoming gossip message",
|
||||||
return execMsgValidator(message.data, topic, false)
|
len = message.data.len, topic, message_id = `$`(sha256.digest(message.data))
|
||||||
let incomingMsgValidatorSnappy = proc(topic: string,
|
try:
|
||||||
message: GossipMsg): Future[bool]
|
return msgValidator SSZ.decode(snappy.decode(message.data), MsgType)
|
||||||
{.async, gcsafe.} =
|
except CatchableError as err:
|
||||||
return execMsgValidator(message.data, topic, true)
|
debug "Gossip validation error", msg = err.msg
|
||||||
|
return false
|
||||||
|
|
||||||
node.switch.addValidator(topic, incomingMsgValidator)
|
node.switch.addValidator(topic & "_snappy", execValidator)
|
||||||
node.switch.addValidator(topic & "_snappy", incomingMsgValidatorSnappy)
|
|
||||||
|
|
||||||
let incomingMsgHandler = proc(topic: string,
|
await node.switch.subscribe(topic & "_snappy", execMsgHandler)
|
||||||
data: seq[byte]) {.async, gcsafe.} =
|
|
||||||
execMsgHandler "unknown", data, topic, false
|
|
||||||
let incomingMsgHandlerSnappy = proc(topic: string,
|
|
||||||
data: seq[byte]) {.async, gcsafe.} =
|
|
||||||
execMsgHandler "unknown", data, topic, true
|
|
||||||
|
|
||||||
var switchSubscriptions: seq[Future[void]] = @[]
|
|
||||||
switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler))
|
|
||||||
switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandlerSnappy))
|
|
||||||
|
|
||||||
await allFutures(switchSubscriptions)
|
|
||||||
|
|
||||||
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
|
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
|
||||||
fut.addCallback do (arg: pointer):
|
fut.addCallback do (arg: pointer):
|
||||||
if not(fut.failed):
|
if not(fut.failed):
|
||||||
trace "Outgoing pubsub message sent", message_id = `$`(digest)
|
trace "Outgoing pubsub message sent", message_id = `$`(digest)
|
||||||
|
elif fut.error != nil:
|
||||||
|
debug "Gossip message not sent", msg = fut.error.msg
|
||||||
|
else:
|
||||||
|
debug "Unexpected future state for gossip", state = fut.state
|
||||||
|
|
||||||
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
|
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
|
||||||
inc nbc_gossip_messages_sent
|
inc nbc_gossip_messages_sent
|
||||||
let broadcastBytes = SSZ.encode(msg)
|
let
|
||||||
var fut = node.switch.publish(topic, broadcastBytes)
|
data = snappy.encode(SSZ.encode(msg))
|
||||||
traceMessage(fut, sha256.digest(broadcastBytes))
|
var futSnappy = node.switch.publish(topic & "_snappy", data)
|
||||||
traceAsyncErrors(fut)
|
traceMessage(futSnappy, sha256.digest(data))
|
||||||
# also publish to the snappy-compressed topics
|
|
||||||
let snappyEncoded = snappy.encode(broadcastBytes)
|
|
||||||
var futSnappy = node.switch.publish(topic & "_snappy", snappyEncoded)
|
|
||||||
traceMessage(futSnappy, sha256.digest(snappyEncoded))
|
|
||||||
traceAsyncErrors(futSnappy)
|
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
|
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
|
||||||
|
|
Loading…
Reference in New Issue