treat gossip decoding errors more strictly (#2793)

* penalize peers for sending gossip messages that fail decoding
* add metrics for decoding/decompression errors
* clean up obsolete exception handlers
This commit is contained in:
Jacek Sieka 2021-08-18 14:30:05 +02:00 committed by GitHub
parent bfe5e74607
commit 70259e4e64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 82 deletions

View File

@ -161,7 +161,7 @@ template errReject(msg: cstring): untyped =
# This doesn't depend on the wall clock or the exact state of the DAG; it's
# an internal consistency/correctness check only, and effectively never has
# false positives. These don't, for example, arise from timeouts.
doAssert false
raiseAssert $msg
err((ValidationResult.Reject, msg))
template errReject(error: (ValidationResult, cstring)): untyped =
@ -170,7 +170,7 @@ template errReject(error: (ValidationResult, cstring)): untyped =
# This doesn't depend on the wall clock or the exact state of the DAG; it's
# an internal consistency/correctness check only, and effectively never has
# false positives. These don't, for example, arise from timeouts.
doAssert false
raiseAssert $error[1]
err(error)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#beacon_attestation_subnet_id

View File

@ -260,6 +260,12 @@ declareCounter nbc_gossip_messages_sent,
declareCounter nbc_gossip_messages_received,
"Number of gossip messages received by this peer"
declareCounter nbc_gossip_failed_snappy,
"Number of gossip messages that failed snappy decompression"
declareCounter nbc_gossip_failed_ssz,
"Number of gossip messages that failed SSZ parsing"
declareCounter nbc_successful_dials,
"Number of successfully dialed peers"
@ -784,9 +790,7 @@ proc handleIncomingStream(network: Eth2Node,
discard network.peerPool.checkPeerScore(peer)
proc toPeerAddr*(r: enr.TypedRecord,
proto: IpTransportProtocol): Result[PeerAddr, cstring] {.
raises: [Defect].} =
proto: IpTransportProtocol): Result[PeerAddr, cstring] =
if not r.secp256k1.isSome:
return err("enr: no secp256k1 key in record")
@ -890,14 +894,14 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} =
# excluding peer here after processing.
node.connTable.excl(remotePeerAddr.peerId)
proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} =
proc toPeerAddr(node: Node): Result[PeerAddr, cstring] =
let nodeRecord = ? node.record.toTypedRecord()
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr)
proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID,
attnets: BitArray[ATTESTATION_SUBNET_COUNT]):
Future[seq[PeerAddr]] {.async, raises: [Defect].} =
Future[seq[PeerAddr]] {.async.} =
## Perform a discovery query for a random target matching the eth2 field
## (forkId) and matching at least one of the attestation subnets.
let nodes = await d.queryRandom()
@ -1688,7 +1692,7 @@ proc shortForm*(id: NetKeyPair): string =
proc subscribe*(
node: Eth2Node, topic: string, topicParams: TopicParams,
enableTopicMetrics: bool = false) {.raises: [Defect, CatchableError].} =
enableTopicMetrics: bool = false) =
proc dummyMsgHandler(topic: string, data: seq[byte]): Future[void] =
# Avoid closure environment with `{.async.}`
var res = newFuture[void]("eth2_network.dummyMsgHandler")
@ -1702,10 +1706,7 @@ proc subscribe*(
node.pubsub.knownTopics.incl(topicName)
node.pubsub.topicParams[topicName] = topicParams
try:
node.pubsub.subscribe(topicName, dummyMsgHandler)
except CatchableError as exc: raise exc # TODO fix libp2p
except Exception as exc: raiseAssert exc.msg
node.pubsub.subscribe(topicName, dummyMsgHandler)
proc setValidTopics*(node: Eth2Node, topics: openArray[string]) =
let topicsSnappy = topics.mapIt(it & "_snappy")
@ -1724,70 +1725,66 @@ proc addValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
ValidationResult {.gcsafe, raises: [Defect].} ) =
# Validate messages as soon as subscribed
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
# Message validators run when subscriptions are enabled - they validate the
# data and return an indication of whether the message should be broadcast
# or not - validation is `async` but implemented without the macro because
# this is a performance hotspot.
proc execValidator(topic: string, message: GossipMsg):
Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
trace "Validating incoming gossip message", len = message.data.len, topic
let res =
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
let res = if decompressed.len > 0:
try:
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
msgValidator(decoded)
else:
debug "Empty gossip data after decompression",
topic, len = message.data.len
ValidationResult.Ignore
except CatchableError as err:
debug "Gossip validation error",
msg = err.msg, topic, len = message.data.len
ValidationResult.Ignore
return newValidationResultFuture(res)
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
msgValidator(decoded) # doesn't raise!
except SszError as e:
inc nbc_gossip_failed_ssz
debug "Error decoding gossip",
topic, len = message.data.len, decompressed = decompressed.len,
error = e.msg
ValidationResult.Reject
else: # snappy returns empty seq on failed decompression
inc nbc_gossip_failed_snappy
debug "Error decompressing gossip", topic, len = message.data.len
ValidationResult.Reject
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
newValidationResultFuture(res)
node.pubsub.addValidator(topic & "_snappy", execValidator)
proc addAsyncValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
Future[ValidationResult] {.gcsafe, raises: [Defect].} ) =
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
proc execValidator(topic: string, message: GossipMsg):
Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
trace "Validating incoming gossip message", len = message.data.len, topic
let res =
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
try:
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
return msgValidator(decoded) # Reuses future from msgValidator
else:
debug "Empty gossip data after decompression",
topic, len = message.data.len
ValidationResult.Ignore
except CatchableError as err:
debug "Gossip validation error",
msg = err.msg, topic, len = message.data.len
ValidationResult.Ignore
return newValidationResultFuture(res)
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
msgValidator(decoded) # doesn't raise!
except SszError as e:
inc nbc_gossip_failed_ssz
debug "Error decoding gossip",
topic, len = message.data.len, decompressed = decompressed.len,
error = e.msg
newValidationResultFuture(ValidationResult.Reject)
else: # snappy returns empty seq on failed decompression
inc nbc_gossip_failed_snappy
debug "Error decompressing gossip", topic, len = message.data.len
newValidationResultFuture(ValidationResult.Reject)
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
node.pubsub.addValidator(topic & "_snappy", execValidator)
proc unsubscribe*(node: Eth2Node, topic: string) {.raises: [Defect, CatchableError].} =
try:
node.pubsub.unsubscribeAll(topic & "_snappy")
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
proc unsubscribe*(node: Eth2Node, topic: string) =
node.pubsub.unsubscribeAll(topic & "_snappy")
proc traceMessage(fut: FutureBase, msgId: seq[byte]) =
fut.addCallback do (arg: pointer):
@ -1821,8 +1818,7 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
raiseAssert exc.msg # TODO in-memory compression shouldn't fail
proc subscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT],
forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
forkDigest: ForkDigest) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
@ -1832,8 +1828,7 @@ proc subscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_
forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now
proc unsubscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT],
forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
forkDigest: ForkDigest) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable

View File

@ -655,8 +655,7 @@ proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] =
mergeAggregateSubnets(wallEpoch + 1)
proc subscribeAttestationSubnetHandlers(node: BeaconNode,
forkDigest: ForkDigest) {.
raises: [Defect, CatchableError].} =
forkDigest: ForkDigest) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# TODO:
# We might want to reuse the previous stability subnet if not expired when:
@ -755,8 +754,7 @@ static:
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()
proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.subscribe(getBeaconBlocksTopic(forkDigest), blocksTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(forkDigest), basicParams)
@ -765,12 +763,10 @@ proc addPhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest)
node.subscribeAttestationSubnetHandlers(forkDigest)
proc addPhase0MessageHandlers(node: BeaconNode)
{.raises: [Defect, CatchableError].} =
proc addPhase0MessageHandlers(node: BeaconNode) =
addPhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(getBeaconBlocksTopic(forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(forkDigest))
node.network.unsubscribe(getProposerSlashingsTopic(forkDigest))
@ -781,22 +777,19 @@ proc removePhase0MessageHandlers(node: BeaconNode, forkDigest: ForkDigest)
node.network.unsubscribe(
getAttestationTopic(forkDigest, SubnetId(subnet_id)))
proc removePhase0MessageHandlers(node: BeaconNode)
{.raises: [Defect, CatchableError].} =
proc removePhase0MessageHandlers(node: BeaconNode) =
removePhase0MessageHandlers(node, node.dag.forkDigests.phase0)
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot)
{.raises: [Defect, CatchableError].} =
proc addAltairMessageHandlers(node: BeaconNode, slot: Slot) =
node.addPhase0MessageHandlers(node.dag.forkDigests.altair)
proc removeAltairMessageHandlers(node: BeaconNode)
{.raises: [Defect, CatchableError].} =
proc removeAltairMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers(node.dag.forkDigests.altair)
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled
proc removeAllMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} =
proc removeAllMessageHandlers(node: BeaconNode) =
node.removePhase0MessageHandlers()
node.removeAltairMessageHandlers()
@ -817,7 +810,7 @@ proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
broadcastStartEpoch =
node.processor.doppelgangerDetection.broadcastStartEpoch
proc updateGossipStatus(node: BeaconNode, slot: Slot) {.raises: [Defect, CatchableError].} =
proc updateGossipStatus(node: BeaconNode, slot: Slot) =
# Syncing tends to be ~1 block/s, and allow for an epoch of time for libp2p
# subscribing to spin up. The faster the sync, the more wallSlot - headSlot
# lead time is required