fixed commets from review

This commit is contained in:
Viktor Kirilov 2020-04-27 19:10:46 +03:00 committed by zah
parent 5ecba6df49
commit 8ef28c905d
1 changed files with 20 additions and 13 deletions

View File

@ -1022,23 +1022,23 @@ proc subscribe*[MsgType](node: Eth2Node,
topic: string, topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.}, msgHandler: proc(msg: MsgType) {.gcsafe.},
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} = msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} =
template execMsgHandler(peerExpr, gossipBytes, gossipTopic) = template execMsgHandler(peerExpr, gossipBytes, gossipTopic, useSnappy) =
inc gossip_messages_received inc gossip_messages_received
trace "Incoming pubsub message received", trace "Incoming pubsub message received",
peer = peerExpr, len = gossipBytes.len, topic = gossipTopic, peer = peerExpr, len = gossipBytes.len, topic = gossipTopic,
message_id = `$`(sha256.digest(gossipBytes)) message_id = `$`(sha256.digest(gossipBytes))
if gossipTopic.endsWith("_snappy"): if useSnappy:
msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType) msgHandler SSZ.decode(snappy.decode(gossipBytes), MsgType)
else: else:
msgHandler SSZ.decode(gossipBytes, MsgType) msgHandler SSZ.decode(gossipBytes, MsgType)
# All message types which are subscribed to should be validated; putting # All message types which are subscribed to should be validated; putting
# this in subscribe(...) ensures that the default approach is correct. # this in subscribe(...) ensures that the default approach is correct.
template execMsgValidator(gossipBytes, gossipTopic): bool = template execMsgValidator(gossipBytes, gossipTopic, useSnappy): bool =
trace "Incoming pubsub message received for validation", trace "Incoming pubsub message received for validation",
len = gossipBytes.len, topic = gossipTopic, len = gossipBytes.len, topic = gossipTopic,
message_id = `$`(sha256.digest(gossipBytes)) message_id = `$`(sha256.digest(gossipBytes))
if gossipTopic.endsWith("_snappy"): if useSnappy:
msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType) msgValidator SSZ.decode(snappy.decode(gossipBytes), MsgType)
else: else:
msgValidator SSZ.decode(gossipBytes, MsgType) msgValidator SSZ.decode(gossipBytes, MsgType)
@ -1047,20 +1047,27 @@ proc subscribe*[MsgType](node: Eth2Node,
let incomingMsgValidator = proc(topic: string, let incomingMsgValidator = proc(topic: string,
message: GossipMsg): Future[bool] message: GossipMsg): Future[bool]
{.async, gcsafe.} = {.async, gcsafe.} =
return execMsgValidator(message.data, topic) return execMsgValidator(message.data, topic, false)
let incomingMsgValidatorSnappy = proc(topic: string,
message: GossipMsg): Future[bool]
{.async, gcsafe.} =
return execMsgValidator(message.data, topic, true)
node.switch.addValidator(topic, incomingMsgValidator) node.switch.addValidator(topic, incomingMsgValidator)
node.switch.addValidator(topic & "_snappy", incomingMsgValidator) node.switch.addValidator(topic & "_snappy", incomingMsgValidatorSnappy)
let incomingMsgHandler = proc(topic: string, let incomingMsgHandler = proc(topic: string,
data: seq[byte]) {.async, gcsafe.} = data: seq[byte]) {.async, gcsafe.} =
execMsgHandler "unknown", data, topic execMsgHandler "unknown", data, topic, false
let incomingMsgHandlerSnappy = proc(topic: string,
data: seq[byte]) {.async, gcsafe.} =
execMsgHandler "unknown", data, topic, true
var switchSubscriptions: seq[Future[void]] = @[] var switchSubscriptions: seq[Future[void]] = @[]
switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler)) switchSubscriptions.add(node.switch.subscribe(topic, incomingMsgHandler))
switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandler)) switchSubscriptions.add(node.switch.subscribe(topic & "_snappy", incomingMsgHandlerSnappy))
waitFor allFutures(switchSubscriptions) await allFutures(switchSubscriptions)
proc traceMessage(fut: FutureBase, digest: MDigest[256]) = proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
fut.addCallback do (arg: pointer): fut.addCallback do (arg: pointer):
@ -1074,10 +1081,10 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
traceMessage(fut, sha256.digest(broadcastBytes)) traceMessage(fut, sha256.digest(broadcastBytes))
traceAsyncErrors(fut) traceAsyncErrors(fut)
# also publish to the snappy-compressed topics # also publish to the snappy-compressed topics
let snappy_encoded = snappy.encode(broadcastBytes) let snappyEncoded = snappy.encode(broadcastBytes)
var fut_snappy = node.switch.publish(topic & "_snappy", snappy_encoded) var futSnappy = node.switch.publish(topic & "_snappy", snappyEncoded)
traceMessage(fut_snappy, sha256.digest(snappy_encoded)) traceMessage(futSnappy, sha256.digest(snappyEncoded))
traceAsyncErrors(fut_snappy) traceAsyncErrors(futSnappy)
# TODO: # TODO:
# At the moment, this is just a compatiblity shim for the existing RLPx functionality. # At the moment, this is just a compatiblity shim for the existing RLPx functionality.