From 9de2c6171f0c707af35b186690c8b0ae930cc9df Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 7 Jul 2022 14:57:56 +0200 Subject: [PATCH] failed broadcast is not a routing error (#3843) a notice in the log is enough - we don't want the REST API to return an error in this case because that makes the validator client think something is seriously wrong (like the BN or message being broken) --- beacon_chain/validators/message_router.nim | 184 +++++++++------------ 1 file changed, 82 insertions(+), 102 deletions(-) diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 87a88f5b3..2c55860a1 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -133,20 +133,26 @@ proc routeSignedBeaconBlock*( let sendTime = router[].getCurrentBeaconTime() delay = sendTime - blck.message.slot.block_deadline() + # The block passed basic gossip validation - we can "safely" broadcast it + # now. In fact, per the spec, we should broadcast it even if it later fails + # to apply to our state. + res = await router[].network.broadcastBeaconBlock(blck) - # The block passed basic gossip validation - we can "safely" broadcast it now. - # In fact, per the spec, we should broadcast it even if it later fails to - # apply to our state. - block: - let res = await router[].network.broadcastBeaconBlock(blck) - if res.isErr: - notice "Block not sent", - blockRoot = shortLog(blck.root), blck = shortLog(blck.message), - signature = shortLog(blck.signature), error = res.error() - return err(res.error()) + if res.isOk(): + beacon_blocks_sent.inc() + beacon_blocks_sent_delay.observe(delay.toFloatSeconds()) - let newBlockRef = await router[].blockProcessor.storeBlock( - MsgSource.api, sendTime, blck, true) + notice "Block sent", + blockRoot = shortLog(blck.root), blck = shortLog(blck.message), + signature = shortLog(blck.signature), delay + else: # "no broadcast" is not a fatal error + notice "Block not sent", + blockRoot = shortLog(blck.root), blck = shortLog(blck.message), + signature = shortLog(blck.signature), error = res.error() + + let + newBlockRef = await router[].blockProcessor.storeBlock( + MsgSource.api, sendTime, blck, true) # The boolean we return tells the caller whether the block was integrated # into the chain @@ -156,13 +162,6 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), err = newBlockRef.error() return ok(Opt.none(BlockRef)) - beacon_blocks_sent.inc() - beacon_blocks_sent_delay.observe(delay.toFloatSeconds()) - - notice "Block sent", - blockRoot = shortLog(blck.root), blck = shortLog(blck.message), - signature = shortLog(blck.signature), delay - return ok(Opt.some(newBlockRef.get())) proc routeAttestation*( @@ -182,20 +181,17 @@ proc routeAttestation*( let sendTime = router[].processor.getCurrentBeaconTime() delay = sendTime - attestation.data.slot.attestation_deadline() + res = await router[].network.broadcastAttestation(subnet_id, attestation) - block: - let res = await router[].network.broadcastAttestation( - subnet_id, attestation) - if not res.isOk: - notice "Attestation not sent", - attestation = shortLog(attestation), error = res.error() - return err(res.error()) + if res.isOk(): + beacon_attestations_sent.inc() + beacon_attestation_sent_delay.observe(delay.toFloatSeconds()) - beacon_attestations_sent.inc() - beacon_attestation_sent_delay.observe(delay.toFloatSeconds()) - - notice "Attestation sent", - attestation = shortLog(attestation), delay, subnet_id + notice "Attestation sent", + attestation = shortLog(attestation), delay, subnet_id + else: # "no broadcast" is not a fatal error + notice "Attestation not sent", + attestation = shortLog(attestation), error = res.error() return ok() @@ -250,23 +246,21 @@ proc routeSignedAggregateAndProof*( let sendTime = router[].processor.getCurrentBeaconTime() delay = sendTime - proof.message.aggregate.data.slot.aggregate_deadline() + res = await router[].network.broadcastAggregateAndProof(proof) - block: - let res = await router[].network.broadcastAggregateAndProof(proof) - if not res.isOk(): - notice "Aggregated attestation not sent", - attestation = shortLog(proof.message.aggregate), - aggregator_index = proof.message.aggregator_index, - signature = shortLog(proof.signature), error = res.error() - return err(res.error()) + if res.isOk(): + beacon_aggregates_sent.inc() - beacon_aggregates_sent.inc() - - notice "Aggregated attestation sent", - attestation = shortLog(proof.message.aggregate), - aggregator_index = proof.message.aggregator_index, - selection_proof = shortLog(proof.message.selection_proof), - signature = shortLog(proof.signature), delay + notice "Aggregated attestation sent", + attestation = shortLog(proof.message.aggregate), + aggregator_index = proof.message.aggregator_index, + selection_proof = shortLog(proof.message.selection_proof), + signature = shortLog(proof.signature), delay + else: # "no broadcast" is not a fatal error + notice "Aggregated attestation not sent", + attestation = shortLog(proof.message.aggregate), + aggregator_index = proof.message.aggregator_index, + signature = shortLog(proof.signature), error = res.error() return ok() @@ -287,18 +281,17 @@ proc routeSyncCommitteeMessage*( sendTime = router[].processor.getCurrentBeaconTime() delay = sendTime - msg.slot.sync_committee_message_deadline() - block: - let res = await router[].network.broadcastSyncCommitteeMessage( + res = await router[].network.broadcastSyncCommitteeMessage( msg, subcommitteeIdx) - if not res.isOk(): - notice "Sync committee message not sent", - message = shortLog(msg), error = res.error() - return err(res.error()) - beacon_sync_committee_messages_sent.inc() - beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds()) + if res.isOk(): + beacon_sync_committee_messages_sent.inc() + beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds()) - notice "Sync committee message sent", message = shortLog(msg), delay + notice "Sync committee message sent", message = shortLog(msg), delay + else: # "no broadcast" is not a fatal error + notice "Sync committee message not sent", + message = shortLog(msg), error = res.error() if router[].onSyncCommitteeMessage != nil: router[].onSyncCommitteeMessage(msg.slot) @@ -407,23 +400,20 @@ proc routeSignedContributionAndProof*( sendTime = router[].processor.getCurrentBeaconTime() delay = sendTime - msg.message.contribution.slot.sync_contribution_deadline() - block: - let res = await router[].network.broadcastSignedContributionAndProof(msg) - if not res.isOk(): - notice "Contribution not sent", - contribution = shortLog(msg.message.contribution), - aggregator_index = msg.message.aggregator_index, - selection_proof = shortLog(msg.message.selection_proof), - signature = shortLog(msg.signature), error = res.error() - return err(res.error()) - - beacon_sync_committee_contributions_sent.inc() - - notice "Contribution sent", - contribution = shortLog(msg.message.contribution), - aggregator_index = msg.message.aggregator_index, - selection_proof = shortLog(msg.message.selection_proof), - signature = shortLog(msg.signature), delay + let res = await router[].network.broadcastSignedContributionAndProof(msg) + if res.isOk(): + beacon_sync_committee_contributions_sent.inc() + notice "Contribution sent", + contribution = shortLog(msg.message.contribution), + aggregator_index = msg.message.aggregator_index, + selection_proof = shortLog(msg.message.selection_proof), + signature = shortLog(msg.signature), delay + else: # "no broadcast" is not a fatal error + notice "Contribution not sent", + contribution = shortLog(msg.message.contribution), + aggregator_index = msg.message.aggregator_index, + selection_proof = shortLog(msg.message.selection_proof), + signature = shortLog(msg.signature), error = res.error() return ok() @@ -438,16 +428,12 @@ proc routeSignedVoluntaryExit*( exit = shortLog(exit), error = res.error() return err(res.error()[1]) - block: - let res = await router[].network.broadcastVoluntaryExit(exit) - if not res.isOk(): - notice "Voluntary exit not sent", - exit = shortLog(exit), error = res.error() - return err(res.error()) - - beacon_voluntary_exits_sent.inc() - - notice "Voluntary exit sent", exit = shortLog(exit) + let res = await router[].network.broadcastVoluntaryExit(exit) + if res.isOk(): + beacon_voluntary_exits_sent.inc() + notice "Voluntary exit sent", exit = shortLog(exit) + else: # "no broadcast" is not a fatal error + notice "Voluntary exit not sent", exit = shortLog(exit), error = res.error() return ok() @@ -462,16 +448,13 @@ proc routeAttesterSlashing*( slashing = shortLog(slashing), error = res.error() return err(res.error()[1]) - block: - let res = await router[].network.broadcastAttesterSlashing(slashing) - if not res.isOk(): - notice "Attester slashing not sent", - slashing = shortLog(slashing), error = res.error() - return err(res.error()) - - beacon_attester_slashings_sent.inc() - - notice "Attester slashing sent", slashing = shortLog(slashing) + let res = await router[].network.broadcastAttesterSlashing(slashing) + if res.isOk(): + beacon_attester_slashings_sent.inc() + notice "Attester slashing sent", slashing = shortLog(slashing) + else: # "no broadcast" is not a fatal error + notice "Attester slashing not sent", + slashing = shortLog(slashing), error = res.error() return ok() @@ -486,15 +469,12 @@ proc routeProposerSlashing*( slashing = shortLog(slashing), error = res.error() return err(res.error()[1]) - block: - let res = await router[].network.broadcastProposerSlashing(slashing) - if not res.isOk(): - notice "Proposer slashing not sent", - slashing = shortLog(slashing), error = res.error() - return err(res.error()) - - beacon_proposer_slashings_sent.inc() - - notice "Proposer slashing sent", slashing = shortLog(slashing) + let res = await router[].network.broadcastProposerSlashing(slashing) + if res.isOk(): + beacon_proposer_slashings_sent.inc() + notice "Proposer slashing sent", slashing = shortLog(slashing) + else: # "no broadcast" is not a fatal error + notice "Proposer slashing not sent", + slashing = shortLog(slashing), error = res.error() return ok()