Gossip message id should decompress data (#1998)
* also fix missing return on empty message
This commit is contained in:
parent
5644cfae8f
commit
43c6c5a0b3
|
@ -1361,20 +1361,25 @@ proc getPersistentNetKeys*(rng: var BrHmacDrbgContext,
|
|||
let privKey = res.get()
|
||||
return KeyPair(seckey: privKey, pubkey: privkey.getKey().tryGet())
|
||||
|
||||
func gossipId(data: openArray[byte]): seq[byte] =
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0-rc.0/specs/phase0/p2p-interface.md#topics-and-messages
|
||||
# We don't use non-Snappy-compressed messages, so don't define
|
||||
# MESSAGE_DOMAIN_INVALID_SNAPPY.
|
||||
const MESSAGE_DOMAIN_VALID_SNAPPY = 0x01000000'u64
|
||||
func gossipId(data: openArray[byte], valid: bool): seq[byte] =
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#topics-and-messages
|
||||
const
|
||||
MESSAGE_DOMAIN_INVALID_SNAPPY = [0x00'u8, 0x00, 0x00, 0x00]
|
||||
MESSAGE_DOMAIN_VALID_SNAPPY = [0x01'u8, 0x00, 0x00, 0x00]
|
||||
let messageDigest = withEth2Hash:
|
||||
h.update uint_to_bytes4(MESSAGE_DOMAIN_VALID_SNAPPY)
|
||||
h.update(
|
||||
if valid: MESSAGE_DOMAIN_VALID_SNAPPY else: MESSAGE_DOMAIN_INVALID_SNAPPY)
|
||||
h.update data
|
||||
|
||||
result = newSeq[byte](20)
|
||||
result[0..19] = messageDigest.data.toOpenArray(0, 19)
|
||||
|
||||
func msgIdProvider(m: messages.Message): seq[byte] =
|
||||
gossipId(m.data)
|
||||
try:
|
||||
let decoded = snappy.decode(m.data, GOSSIP_MAX_SIZE)
|
||||
gossipId(decoded, true)
|
||||
except CatchableError:
|
||||
gossipId(m.data, false)
|
||||
|
||||
proc createEth2Node*(rng: ref BrHmacDrbgContext,
|
||||
conf: BeaconNodeConf,
|
||||
|
@ -1436,18 +1441,22 @@ proc addValidator*[MsgType](node: Eth2Node,
|
|||
topic: string, message: GossipMsg): Future[ValidationResult] {.async.} =
|
||||
inc nbc_gossip_messages_received
|
||||
trace "Validating incoming gossip message",
|
||||
len = message.data.len, topic, msgId = gossipId(message.data)
|
||||
try:
|
||||
let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
|
||||
if decompressed.len > 0:
|
||||
return msgValidator(SSZ.decode(decompressed, MsgType))
|
||||
else:
|
||||
# TODO(zah) penalize peer?
|
||||
debug "Failed to decompress gossip payload"
|
||||
except CatchableError as err:
|
||||
debug "Gossip validation error",
|
||||
msg = err.msg, msgId = gossipId(message.data)
|
||||
return ValidationResult.Ignore
|
||||
len = message.data.len, topic
|
||||
|
||||
let res =
|
||||
try:
|
||||
let decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
|
||||
if decompressed.len > 0:
|
||||
msgValidator(SSZ.decode(decompressed, MsgType))
|
||||
else:
|
||||
debug "Empty gossip data after decompression",
|
||||
topic, len = message.data.len
|
||||
ValidationResult.Ignore
|
||||
except CatchableError as err:
|
||||
debug "Gossip validation error",
|
||||
msg = err.msg, topic, len = message.data.len
|
||||
ValidationResult.Ignore
|
||||
return res
|
||||
|
||||
node.pubsub.addValidator(topic & "_snappy", execValidator)
|
||||
|
||||
|
@ -1464,12 +1473,14 @@ proc traceMessage(fut: FutureBase, msgId: string) =
|
|||
debug "Unexpected future state for gossip", msgId, state = fut.state
|
||||
|
||||
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
|
||||
let data = snappy.encode(SSZ.encode(msg))
|
||||
let
|
||||
uncompressed = SSZ.encode(msg)
|
||||
compressed = snappy.encode(uncompressed)
|
||||
|
||||
# This is only for messages we create. A message this large amounts to an
|
||||
# internal logic error.
|
||||
doAssert data.len <= GOSSIP_MAX_SIZE
|
||||
doAssert uncompressed.len <= GOSSIP_MAX_SIZE
|
||||
inc nbc_gossip_messages_sent
|
||||
|
||||
var futSnappy = node.pubsub.publish(topic & "_snappy", data)
|
||||
traceMessage(futSnappy, string.fromBytes(gossipId(data)))
|
||||
var futSnappy = node.pubsub.publish(topic & "_snappy", compressed)
|
||||
traceMessage(futSnappy, string.fromBytes(gossipId(uncompressed, true)))
|
||||
|
|
Loading…
Reference in New Issue