mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-09 05:52:45 +00:00
use the eth2 message id for gossip (in logging too) (#1246)
* use the eth2 message id for gossip (in logging too) * bump * add spec link
This commit is contained in:
parent
41a4e47857
commit
816779733e
@ -4,7 +4,7 @@ import
|
|||||||
options as stdOptions,
|
options as stdOptions,
|
||||||
|
|
||||||
# Status libs
|
# Status libs
|
||||||
stew/[varints, base58, endians2, results, byteutils],
|
stew/[varints, base58, base64, endians2, results, byteutils],
|
||||||
stew/shims/net as stewNet,
|
stew/shims/net as stewNet,
|
||||||
stew/shims/[macros, tables],
|
stew/shims/[macros, tables],
|
||||||
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
|
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
|
||||||
@ -15,7 +15,7 @@ import
|
|||||||
multiaddress, multicodec, crypto/crypto, crypto/secp,
|
multiaddress, multicodec, crypto/crypto, crypto/secp,
|
||||||
protocols/identify, protocols/protocol],
|
protocols/identify, protocols/protocol],
|
||||||
libp2p/protocols/secure/[secure, secio],
|
libp2p/protocols/secure/[secure, secio],
|
||||||
libp2p/protocols/pubsub/[pubsub, floodsub, rpc/messages],
|
libp2p/protocols/pubsub/[pubsub, floodsub, rpc/message, rpc/messages],
|
||||||
libp2p/transports/tcptransport,
|
libp2p/transports/tcptransport,
|
||||||
libp2p/stream/lpstream,
|
libp2p/stream/lpstream,
|
||||||
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
|
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
|
||||||
@ -1079,6 +1079,13 @@ proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair =
|
|||||||
|
|
||||||
KeyPair(seckey: privKey, pubkey: privKey.getKey().tryGet())
|
KeyPair(seckey: privKey, pubkey: privKey.getKey().tryGet())
|
||||||
|
|
||||||
|
func gossipId(data: openArray[byte]): string =
|
||||||
|
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/p2p-interface.md#topics-and-messages
|
||||||
|
base64.encode(Base64Url, sha256.digest(data).data)
|
||||||
|
|
||||||
|
func msgIdProvider(m: messages.Message): string =
|
||||||
|
gossipId(m.data)
|
||||||
|
|
||||||
proc createEth2Node*(conf: BeaconNodeConf, enrForkId: ENRForkID): Future[Eth2Node] {.async, gcsafe.} =
|
proc createEth2Node*(conf: BeaconNodeConf, enrForkId: ENRForkID): Future[Eth2Node] {.async, gcsafe.} =
|
||||||
var
|
var
|
||||||
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
|
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
|
||||||
@ -1096,7 +1103,8 @@ proc createEth2Node*(conf: BeaconNodeConf, enrForkId: ENRForkID): Future[Eth2Nod
|
|||||||
var switch = newStandardSwitch(some keys.seckey, hostAddress,
|
var switch = newStandardSwitch(some keys.seckey, hostAddress,
|
||||||
triggerSelf = true, gossip = true,
|
triggerSelf = true, gossip = true,
|
||||||
sign = false, verifySignature = false,
|
sign = false, verifySignature = false,
|
||||||
transportFlags = {ServerFlags.ReuseAddr})
|
transportFlags = {ServerFlags.ReuseAddr},
|
||||||
|
msgIdProvider = msgIdProvider)
|
||||||
result = Eth2Node.init(conf, enrForkId, switch,
|
result = Eth2Node.init(conf, enrForkId, switch,
|
||||||
extIp, extTcpPort, extUdpPort,
|
extIp, extTcpPort, extUdpPort,
|
||||||
keys.seckey.asEthKey)
|
keys.seckey.asEthKey)
|
||||||
@ -1137,44 +1145,44 @@ proc subscribe*[MsgType](node: Eth2Node,
|
|||||||
proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc execMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
inc nbc_gossip_messages_received
|
inc nbc_gossip_messages_received
|
||||||
trace "Incoming pubsub message received",
|
trace "Incoming pubsub message received",
|
||||||
len = data.len, topic, message_id = `$`(sha256.digest(data))
|
len = data.len, topic, msgId = gossipId(data)
|
||||||
try:
|
try:
|
||||||
msgHandler SSZ.decode(snappy.decode(data), MsgType)
|
msgHandler SSZ.decode(snappy.decode(data), MsgType)
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
debug "Gossip msg handler error",
|
debug "Gossip msg handler error",
|
||||||
msg = err.msg, len = data.len, topic,
|
msg = err.msg, len = data.len, topic, msgId = gossipId(data)
|
||||||
message_id = `$`(sha256.digest(data))
|
|
||||||
|
|
||||||
# Validate messages as soon as subscribed
|
# Validate messages as soon as subscribed
|
||||||
proc execValidator(
|
proc execValidator(
|
||||||
topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} =
|
topic: string, message: GossipMsg): Future[bool] {.async, gcsafe.} =
|
||||||
trace "Validating incoming gossip message",
|
trace "Validating incoming gossip message",
|
||||||
len = message.data.len, topic, message_id = `$`(sha256.digest(message.data))
|
len = message.data.len, topic, msgId = gossipId(message.data)
|
||||||
try:
|
try:
|
||||||
return msgValidator SSZ.decode(snappy.decode(message.data), MsgType)
|
return msgValidator SSZ.decode(snappy.decode(message.data), MsgType)
|
||||||
except CatchableError as err:
|
except CatchableError as err:
|
||||||
debug "Gossip validation error", msg = err.msg
|
debug "Gossip validation error",
|
||||||
|
msg = err.msg, msgId = gossipId(message.data)
|
||||||
return false
|
return false
|
||||||
|
|
||||||
node.switch.addValidator(topic & "_snappy", execValidator)
|
node.switch.addValidator(topic & "_snappy", execValidator)
|
||||||
|
|
||||||
await node.switch.subscribe(topic & "_snappy", execMsgHandler)
|
await node.switch.subscribe(topic & "_snappy", execMsgHandler)
|
||||||
|
|
||||||
proc traceMessage(fut: FutureBase, digest: MDigest[256]) =
|
proc traceMessage(fut: FutureBase, msgId: string) =
|
||||||
fut.addCallback do (arg: pointer):
|
fut.addCallback do (arg: pointer):
|
||||||
if not(fut.failed):
|
if not(fut.failed):
|
||||||
trace "Outgoing pubsub message sent", message_id = `$`(digest)
|
trace "Outgoing pubsub message sent", msgId
|
||||||
elif fut.error != nil:
|
elif fut.error != nil:
|
||||||
debug "Gossip message not sent", msg = fut.error.msg
|
debug "Gossip message not sent", msgId, err = fut.error.msg
|
||||||
else:
|
else:
|
||||||
debug "Unexpected future state for gossip", state = fut.state
|
debug "Unexpected future state for gossip", msgId, state = fut.state
|
||||||
|
|
||||||
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
|
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
|
||||||
inc nbc_gossip_messages_sent
|
inc nbc_gossip_messages_sent
|
||||||
let
|
let
|
||||||
data = snappy.encode(SSZ.encode(msg))
|
data = snappy.encode(SSZ.encode(msg))
|
||||||
var futSnappy = node.switch.publish(topic & "_snappy", data)
|
var futSnappy = node.switch.publish(topic & "_snappy", data)
|
||||||
traceMessage(futSnappy, sha256.digest(data))
|
traceMessage(futSnappy, gossipId(data))
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
|
# At the moment, this is just a compatiblity shim for the existing RLPx functionality.
|
||||||
|
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 7a95f1844b039a3f9fc1d9dc6b9fe7182aafba39
|
Subproject commit aa6756dfe049dd3eba215938308ec956335c7e9e
|
Loading…
x
Reference in New Issue
Block a user