Introduce message router (#3829)

Whether new blocks/attestations/etc are produced internally or received
via REST, their journey through the node is the same - to ensure that
they get the same treatment (logging, metrics, processing), this PR
moves the routing to a dedicated module and fixes several small
differences that existed before.

* `xxxValidator` -> `processMessageName` - the processor also was adding
messages to pools, so we want the name to reflect that action
* add missing "sent" metrics for some messages
* document ignore policy better - already-seen messages are not actaully
rebroadcast by libp2p
* skip redundant signature checks for internal validators consistently
This commit is contained in:
Jacek Sieka 2022-07-06 18:11:44 +02:00 committed by GitHub
parent ae05ba9a48
commit e1830519a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 819 additions and 762 deletions

View File

@ -23,7 +23,8 @@ import
sync_committee_msg_pool],
./spec/datatypes/[base, altair],
./sync/[optimistic_sync_light_client, sync_manager, request_manager],
./validators/[action_tracker, validator_monitor, validator_pool],
./validators/[
action_tracker, message_router, validator_monitor, validator_pool],
./rpc/state_ttl_cache
export
@ -32,7 +33,8 @@ export
attestation_pool, sync_committee_msg_pool, validator_pool,
eth2_network, eth1_monitor, optimistic_sync_light_client,
request_manager, sync_manager, eth2_processor, blockchain_dag,
block_quarantine, base, exit_pool, validator_monitor, consensus_manager
block_quarantine, base, exit_pool, message_router, validator_monitor,
consensus_manager
type
RpcServer* = RpcHttpServer
@ -85,6 +87,7 @@ type
validatorMonitor*: ref ValidatorMonitor
stateTtlCache*: StateTtlCache
nextExchangeTransitionConfTime*: Moment
router*: ref MessageRouter
const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

View File

@ -9,10 +9,10 @@
import
# Standard libraries
std/[options, tables, sequtils],
std/[tables, sequtils],
# Status libraries
metrics,
chronicles, stew/byteutils,
chronicles, stew/[byteutils, results],
# Internal
../spec/[
beaconstate, eth2_merkleization, forks, helpers,
@ -22,7 +22,7 @@ import
../fork_choice/fork_choice,
../beacon_clock
export options, tables, phase0, altair, bellatrix, blockchain_dag, fork_choice
export tables, results, phase0, altair, bellatrix, blockchain_dag, fork_choice
const
ATTESTATION_LOOKBACK* =
@ -177,12 +177,12 @@ proc addForkChoiceVotes(
# hopefully the fork choice will heal itself over time.
error "Couldn't add attestation to fork choice, bug?", err = v.error()
func candidateIdx(pool: AttestationPool, slot: Slot): Option[int] =
func candidateIdx(pool: AttestationPool, slot: Slot): Opt[int] =
if slot >= pool.startingSlot and
slot < (pool.startingSlot + pool.candidates.lenu64):
some(int(slot mod pool.candidates.lenu64))
Opt.some(int(slot mod pool.candidates.lenu64))
else:
none(int)
Opt.none(int)
proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) =
if wallSlot + 1 < pool.candidates.lenu64:
@ -209,15 +209,15 @@ proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) =
pool.startingSlot = newStartingSlot
func oneIndex(bits: CommitteeValidatorsBits): Option[int] =
func oneIndex(bits: CommitteeValidatorsBits): Opt[int] =
# Find the index of the set bit, iff one bit is set
var res = none(int)
var res = Opt.none(int)
for idx in 0..<bits.len():
if bits[idx]:
if res.isNone():
res = some(idx)
res = Opt.some(idx)
else: # More than one bit set!
return none(int)
return Opt.none(int)
res
func toAttestation(entry: AttestationEntry, validation: Validation): Attestation =
@ -417,8 +417,8 @@ proc addForkChoice*(pool: var AttestationPool,
error "Couldn't add block to fork choice, bug?",
blck = shortLog(blck), err = state.error
iterator attestations*(pool: AttestationPool, slot: Option[Slot],
committee_index: Option[CommitteeIndex]): Attestation =
iterator attestations*(pool: AttestationPool, slot: Opt[Slot],
committee_index: Opt[CommitteeIndex]): Attestation =
let candidateIndices =
if slot.isSome():
let candidateIdx = pool.candidateIdx(slot.get())
@ -690,11 +690,11 @@ func bestValidation(aggregates: openArray[Validation]): (int, int) =
func getAggregatedAttestation*(pool: var AttestationPool,
slot: Slot,
attestation_data_root: Eth2Digest): Option[Attestation] =
attestation_data_root: Eth2Digest): Opt[Attestation] =
let
candidateIdx = pool.candidateIdx(slot)
if candidateIdx.isNone:
return none(Attestation)
return Opt.none(Attestation)
pool.candidates[candidateIdx.get].withValue(attestation_data_root, entry):
entry[].updateAggregates()
@ -702,22 +702,22 @@ func getAggregatedAttestation*(pool: var AttestationPool,
let (bestIndex, _) = bestValidation(entry[].aggregates)
# Found the right hash, no need to look further
return some(entry[].toAttestation(entry[].aggregates[bestIndex]))
return Opt.some(entry[].toAttestation(entry[].aggregates[bestIndex]))
none(Attestation)
Opt.none(Attestation)
func getAggregatedAttestation*(pool: var AttestationPool,
slot: Slot,
index: CommitteeIndex): Option[Attestation] =
index: CommitteeIndex): Opt[Attestation] =
## Select the attestation that has the most votes going for it in the given
## slot/index
## https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#construct-aggregate
let
candidateIdx = pool.candidateIdx(slot)
if candidateIdx.isNone:
return none(Attestation)
return Opt.none(Attestation)
var res: Option[Attestation]
var res: Opt[Attestation]
for _, entry in pool.candidates[candidateIdx.get].mpairs():
doAssert entry.data.slot == slot
if index != entry.data.index:
@ -728,7 +728,7 @@ func getAggregatedAttestation*(pool: var AttestationPool,
let (bestIndex, best) = bestValidation(entry.aggregates)
if res.isNone() or best > res.get().aggregation_bits.countOnes():
res = some(entry.toAttestation(entry.aggregates[bestIndex]))
res = Opt.some(entry.toAttestation(entry.aggregates[bestIndex]))
res

View File

@ -182,7 +182,6 @@ proc storeBlock*(
let
attestationPool = self.consensusManager.attestationPool
startTick = Moment.now()
wallSlot = wallTime.slotOrZero()
vm = self.validatorMonitor
dag = self.consensusManager.dag

View File

@ -117,7 +117,7 @@ type
# Gossip validated -> enqueue for further verification
# ----------------------------------------------------------------
blockProcessor: ref BlockProcessor
blockProcessor*: ref BlockProcessor
# Validator monitoring
validatorMonitor: ref ValidatorMonitor
@ -188,7 +188,7 @@ proc new*(T: type Eth2Processor,
# any side effects until the message is fully validated, or invalid messages
# could be used to push out valid messages.
proc blockValidator*(
proc processSignedBeaconBlock*(
self: var Eth2Processor, src: MsgSource,
signedBlock: ForkySignedBeaconBlock): ValidationRes =
let
@ -287,7 +287,7 @@ proc checkForPotentialDoppelganger(
const QuitDoppelganger = 1031
quit QuitDoppelganger
proc attestationValidator*(
proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: Attestation, subnet_id: SubnetId,
checkSignature: bool = true): Future[ValidationRes] {.async.} =
@ -335,9 +335,10 @@ proc attestationValidator*(
beacon_attestations_dropped.inc(1, [$v.error[0]])
err(v.error())
proc aggregateValidator*(
proc processSignedAggregateAndProof*(
self: ref Eth2Processor, src: MsgSource,
signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationRes] {.async.} =
signedAggregateAndProof: SignedAggregateAndProof,
checkSignature = true, checkCover = true): Future[ValidationRes] {.async.} =
var wallTime = self.getCurrentBeaconTime()
let (afterGenesis, wallSlot) = wallTime.toSlot()
@ -350,7 +351,7 @@ proc aggregateValidator*(
if not afterGenesis:
notice "Aggregate before genesis"
return errIgnore("Aggreagte before genesis")
return errIgnore("Aggregate before genesis")
# Potential under/overflows are fine; would just create odd logs
let delay =
@ -359,7 +360,8 @@ proc aggregateValidator*(
let v =
await self.attestationPool.validateAggregate(
self.batchCrypto, signedAggregateAndProof, wallTime)
self.batchCrypto, signedAggregateAndProof, wallTime,
checkSignature = checkSignature, checkCover = checkCover)
return if v.isOk():
# Due to async validation the wallTime here might have changed
@ -389,7 +391,7 @@ proc aggregateValidator*(
err(v.error())
proc attesterSlashingValidator*(
proc processAttesterSlashing*(
self: var Eth2Processor, src: MsgSource,
attesterSlashing: AttesterSlashing): ValidationRes =
logScope:
@ -413,7 +415,7 @@ proc attesterSlashingValidator*(
v
proc proposerSlashingValidator*(
proc processProposerSlashing*(
self: var Eth2Processor, src: MsgSource,
proposerSlashing: ProposerSlashing): Result[void, ValidationError] =
logScope:
@ -436,7 +438,7 @@ proc proposerSlashingValidator*(
v
proc voluntaryExitValidator*(
proc processSignedVoluntaryExit*(
self: var Eth2Processor, src: MsgSource,
signedVoluntaryExit: SignedVoluntaryExit): Result[void, ValidationError] =
logScope:
@ -460,7 +462,7 @@ proc voluntaryExitValidator*(
v
proc syncCommitteeMessageValidator*(
proc processSyncCommitteeMessage*(
self: ref Eth2Processor, src: MsgSource,
syncCommitteeMsg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex,
@ -505,7 +507,7 @@ proc syncCommitteeMessageValidator*(
beacon_sync_committee_messages_dropped.inc(1, [$v.error[0]])
err(v.error())
proc contributionValidator*(
proc processSignedContributionAndProof*(
self: ref Eth2Processor, src: MsgSource,
contributionAndProof: SignedContributionAndProof,
checkSignature: bool = true): Future[Result[void, ValidationError]] {.async.} =
@ -547,7 +549,7 @@ proc contributionValidator*(
err(v.error())
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_finality_update
proc lightClientFinalityUpdateValidator*(
proc processLightClientFinalityUpdate*(
self: var Eth2Processor, src: MsgSource,
finality_update: altair.LightClientFinalityUpdate
): Result[void, ValidationError] =
@ -558,7 +560,7 @@ proc lightClientFinalityUpdateValidator*(
v
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_optimistic_update
proc lightClientOptimisticUpdateValidator*(
proc processLightClientOptimisticUpdate*(
self: var Eth2Processor, src: MsgSource,
optimistic_update: altair.LightClientOptimisticUpdate
): Result[void, ValidationError] =

View File

@ -554,7 +554,8 @@ proc validateAggregate*(
pool: ref AttestationPool,
batchCrypto: ref BatchCrypto,
signedAggregateAndProof: SignedAggregateAndProof,
wallTime: BeaconTime):
wallTime: BeaconTime,
checkSignature = true, checkCover = true):
Future[Result[
tuple[attestingIndices: seq[ValidatorIndex], sig: CookedSig],
ValidationError]] {.async.} =
@ -622,22 +623,12 @@ proc validateAggregate*(
return err(v.error)
v.get()
if pool[].covers(aggregate.data, aggregate.aggregation_bits):
# https://github.com/ethereum/consensus-specs/issues/2183 - althoughh this
# check was temporarily removed from the spec, the intent is to reinstate it
# per discussion in the ticket.
#
# [IGNORE] The valid aggregate attestation defined by
# `hash_tree_root(aggregate)` has _not_ already been seen
# (via aggregate gossip, within a verified block, or through the creation of
# an equivalent aggregate locally).
# Our implementation of this check is slightly different in that it doesn't
# consider aggregates from verified blocks - this would take a rather heavy
# index to work correcly under fork conditions - we also check for coverage
# of attestation bits instead of comparing with full root of aggreagte:
# this captures the spirit of the checkk by ignoring aggregates that are
# strict subsets of other, already-seen aggregates.
if checkCover and
pool[].covers(aggregate.data, aggregate.aggregation_bits):
# [IGNORE] A valid aggregate attestation defined by
# `hash_tree_root(aggregate.data)` whose `aggregation_bits` is a non-strict
# superset has _not_ already been seen.
# https://github.com/ethereum/consensus-specs/pull/2847
return errIgnore("Aggregate already covered")
let
@ -690,52 +681,60 @@ proc validateAggregate*(
attesting_indices = get_attesting_indices(
epochRef, slot, committee_index, aggregate.aggregation_bits)
let deferredCrypto = batchCrypto
.scheduleAggregateChecks(
fork, genesis_validators_root,
signedAggregateAndProof, epochRef, attesting_indices
)
if deferredCrypto.isErr():
return checkedReject(deferredCrypto.error)
let
(aggregatorFut, slotFut, aggregateFut, sig) = deferredCrypto.get()
sig = if checkSignature:
let deferredCrypto = batchCrypto
.scheduleAggregateChecks(
fork, genesis_validators_root,
signedAggregateAndProof, epochRef, attesting_indices
)
if deferredCrypto.isErr():
return checkedReject(deferredCrypto.error)
block:
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
var x = await aggregatorFut
case x
of BatchResult.Invalid:
return checkedReject("Aggregate: invalid aggregator signature")
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return errIgnore("Aggregate: timeout checking aggregator signature")
of BatchResult.Valid:
discard
let
(aggregatorFut, slotFut, aggregateFut, sig) = deferredCrypto.get()
block:
# [REJECT] aggregate_and_proof.selection_proof
var x = await slotFut
case x
of BatchResult.Invalid:
return checkedReject("Aggregate: invalid slot signature")
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return errIgnore("Aggregate: timeout checking slot signature")
of BatchResult.Valid:
discard
block:
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
var x = await aggregatorFut
case x
of BatchResult.Invalid:
return checkedReject("Aggregate: invalid aggregator signature")
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return errIgnore("Aggregate: timeout checking aggregator signature")
of BatchResult.Valid:
discard
block:
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
var x = await aggregateFut
case x
of BatchResult.Invalid:
return checkedReject("Aggregate: invalid aggregate signature")
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return errIgnore("Aggregate: timeout checking aggregate signature")
of BatchResult.Valid:
discard
block:
# [REJECT] aggregate_and_proof.selection_proof
var x = await slotFut
case x
of BatchResult.Invalid:
return checkedReject("Aggregate: invalid slot signature")
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return errIgnore("Aggregate: timeout checking slot signature")
of BatchResult.Valid:
discard
block:
# [REJECT] The aggregator signature, signed_aggregate_and_proof.signature, is valid.
var x = await aggregateFut
case x
of BatchResult.Invalid:
return checkedReject("Aggregate: invalid aggregate signature")
of BatchResult.Timeout:
beacon_aggregates_dropped_queue_full.inc()
return errIgnore("Aggregate: timeout checking aggregate signature")
of BatchResult.Valid:
discard
sig
else:
let sig = aggregate.signature.load()
if not sig.isSome():
return checkedReject("Aggregate: unable to load signature")
sig.get()
# The following rule follows implicitly from that we clear out any
# unviable blocks from the chain dag:

View File

@ -401,7 +401,7 @@ func toValidationError(
errIgnore($r.error)
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_finality_update
proc lightClientFinalityUpdateValidator*(
proc processLightClientFinalityUpdate*(
self: var LightClientProcessor, src: MsgSource,
finality_update: altair.LightClientFinalityUpdate
): Result[void, ValidationError] =
@ -414,7 +414,7 @@ proc lightClientFinalityUpdateValidator*(
v
# https://github.com/ethereum/consensus-specs/blob/vFuture/specs/altair/sync-protocol.md#light_client_optimistic_update
proc lightClientOptimisticUpdateValidator*(
proc processLightClientOptimisticUpdate*(
self: var LightClientProcessor, src: MsgSource,
optimistic_update: altair.LightClientOptimisticUpdate
): Result[void, ValidationError] =

View File

@ -183,7 +183,7 @@ import
from
./gossip_processing/eth2_processor
import
lightClientFinalityUpdateValidator, lightClientOptimisticUpdateValidator
processLightClientFinalityUpdate, processLightClientOptimisticUpdate
declareCounter beacon_light_client_finality_updates_received,
"Number of valid LC finality updates processed by this node"
@ -287,12 +287,12 @@ proc installMessageValidators*(
lightClient.network.addValidator(
getLightClientFinalityUpdateTopic(digest),
proc(msg: altair.LightClientFinalityUpdate): ValidationResult =
validate(msg, lightClientFinalityUpdateValidator))
validate(msg, processLightClientFinalityUpdate))
lightClient.network.addValidator(
getLightClientOptimisticUpdateTopic(digest),
proc(msg: altair.LightClientOptimisticUpdate): ValidationResult =
validate(msg, lightClientOptimisticUpdateValidator))
validate(msg, processLightClientOptimisticUpdate))
const lightClientTopicParams = TopicParams.init()
static: lightClientTopicParams.validateParameters().tryGet()

View File

@ -341,6 +341,25 @@ proc initFullNode(
node.network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0)
router = (ref MessageRouter)(
processor: processor,
network: node.network,
)
if node.config.lightClientDataServe.get:
proc scheduleSendingLightClientUpdates(slot: Slot) =
if node.lightClientPool[].broadcastGossipFut != nil:
return
if slot <= node.lightClientPool[].latestBroadcastedSlot:
return
node.lightClientPool[].latestBroadcastedSlot = slot
template fut(): auto = node.lightClientPool[].broadcastGossipFut
fut = node.handleLightClientUpdates(slot)
fut.addCallback do (p: pointer) {.gcsafe.}:
fut = nil
router.onSyncCommitteeMessage = scheduleSendingLightClientUpdates
dag.setFinalizationCb makeOnFinalizationCb(node.eventBus, node.eth1Monitor)
dag.setBlockCb(onBlockAdded)
@ -359,6 +378,7 @@ proc initFullNode(
node.requestManager = RequestManager.init(node.network, blockVerifier)
node.syncManager = syncManager
node.backfiller = backfiller
node.router = router
debug "Loading validators", validatorsDir = config.validatorsDir()
@ -1399,7 +1419,7 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.phase0),
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].blockValidator(
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
template installPhase0Validators(digest: auto) =
@ -1411,7 +1431,7 @@ proc installMessageValidators(node: BeaconNode) =
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.attestationValidator(
await node.processor.processAttestation(
MsgSource.gossip, attestation, subnet_id)))
node.network.addAsyncValidator(
@ -1419,28 +1439,28 @@ proc installMessageValidators(node: BeaconNode) =
proc(signedAggregateAndProof: SignedAggregateAndProof):
Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.aggregateValidator(
MsgSource.gossip, signedAggregateAndProof)))
await node.processor.processSignedAggregateAndProof(
MsgSource.gossip, signedAggregateAndProof, false)))
node.network.addValidator(
getAttesterSlashingsTopic(digest),
proc (attesterSlashing: AttesterSlashing): ValidationResult =
toValidationResult(
node.processor[].attesterSlashingValidator(
node.processor[].processAttesterSlashing(
MsgSource.gossip, attesterSlashing)))
node.network.addValidator(
getProposerSlashingsTopic(digest),
proc (proposerSlashing: ProposerSlashing): ValidationResult =
toValidationResult(
node.processor[].proposerSlashingValidator(
node.processor[].processProposerSlashing(
MsgSource.gossip, proposerSlashing)))
node.network.addValidator(
getVoluntaryExitsTopic(digest),
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
toValidationResult(
node.processor[].voluntaryExitValidator(
node.processor[].processSignedVoluntaryExit(
MsgSource.gossip, signedVoluntaryExit)))
installPhase0Validators(forkDigests.phase0)
@ -1453,13 +1473,13 @@ proc installMessageValidators(node: BeaconNode) =
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.altair),
proc (signedBlock: altair.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].blockValidator(
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
node.network.addValidator(
getBeaconBlocksTopic(forkDigests.bellatrix),
proc (signedBlock: bellatrix.SignedBeaconBlock): ValidationResult =
toValidationResult(node.processor[].blockValidator(
toValidationResult(node.processor[].processSignedBeaconBlock(
MsgSource.gossip, signedBlock)))
template installSyncCommitteeeValidators(digest: auto) =
@ -1471,14 +1491,14 @@ proc installMessageValidators(node: BeaconNode) =
# This proc needs to be within closureScope; don't lift out of loop.
proc(msg: SyncCommitteeMessage): Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.syncCommitteeMessageValidator(
await node.processor.processSyncCommitteeMessage(
MsgSource.gossip, msg, idx)))
node.network.addAsyncValidator(
getSyncCommitteeContributionAndProofTopic(digest),
proc(msg: SignedContributionAndProof): Future[ValidationResult] {.async.} =
return toValidationResult(
await node.processor.contributionValidator(
await node.processor.processSignedContributionAndProof(
MsgSource.gossip, msg)))
installSyncCommitteeeValidators(forkDigests.altair)

View File

@ -12,7 +12,6 @@ import
./rest_utils,
../beacon_node, ../networking/eth2_network,
../consensus_object_pools/[blockchain_dag, exit_pool, spec_cache],
../validators/validator_duties,
../spec/[eth2_merkleization, forks, network, validator],
../spec/datatypes/[phase0, altair],
./state_ttl_cache
@ -744,7 +743,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
# https://ethereum.github.io/beacon-APIs/#/Beacon/publishBlock
router.api(MethodPost, "/eth/v1/beacon/blocks") do (
contentBody: Option[ContentBody]) -> RestApiResponse:
let forkedBlock =
let res =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
@ -760,12 +759,12 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
withBlck(forked):
blck.root = hash_tree_root(blck.message)
forked
await node.router.routeSignedBeaconBlock(blck)
let res = await node.sendBeaconBlock(forkedBlock)
if res.isErr():
return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError)
if not(res.get()):
return RestApiResponse.jsonError(
Http503, BeaconNodeInSyncError, $res.error())
if res.get().isNone():
return RestApiResponse.jsonError(Http202, BlockValidationError)
return RestApiResponse.jsonMsgResponse(BlockValidationSuccess)
@ -908,18 +907,18 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonError(Http400,
InvalidCommitteeIndexValueError,
$rindex.error())
some(rindex.get())
Opt.some(rindex.get())
else:
none[CommitteeIndex]()
Opt.none(CommitteeIndex)
let vslot =
if slot.isSome():
let rslot = slot.get()
if rslot.isErr():
return RestApiResponse.jsonError(Http400, InvalidSlotValueError,
$rslot.error())
some(rslot.get())
Opt.some(rslot.get())
else:
none[Slot]()
Opt.none(Slot)
var res: seq[Attestation]
for item in node.attestationPool[].attestations(vslot, vindex):
res.add(item)
@ -945,7 +944,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
block:
var res: seq[Future[SendResult]]
for attestation in attestations:
res.add(node.sendAttestation(attestation))
res.add(node.router.routeAttestation(attestation))
res
let failures =
block:
@ -997,7 +996,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidAttesterSlashingObjectError,
$dres.error())
dres.get()
let res = await node.sendAttesterSlashing(slashing)
let res = await node.router.routeAttesterSlashing(slashing)
if res.isErr():
return RestApiResponse.jsonError(Http400,
AttesterSlashingValidationError,
@ -1029,7 +1028,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidProposerSlashingObjectError,
$dres.error())
dres.get()
let res = await node.sendProposerSlashing(slashing)
let res = await node.router.routeProposerSlashing(slashing)
if res.isErr():
return RestApiResponse.jsonError(Http400,
ProposerSlashingValidationError,
@ -1049,7 +1048,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidSyncCommitteeSignatureMessageError)
dres.get()
let results = await node.sendSyncCommitteeMessages(messages)
let results = await node.router.routeSyncCommitteeMessages(messages)
let failures =
block:
@ -1092,7 +1091,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
InvalidVoluntaryExitObjectError,
$dres.error())
dres.get()
let res = await node.sendVoluntaryExit(exit)
let res = await node.router.routeSignedVoluntaryExit(exit)
if res.isErr():
return RestApiResponse.jsonError(Http400,
VoluntaryExitValidationError,

View File

@ -528,7 +528,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
block:
var res: seq[Future[SendResult]]
for proof in proofs:
res.add(node.sendAggregateAndProof(proof))
res.add(node.router.routeSignedAggregateAndProof(proof))
res
await allFutures(pending)
for future in pending:
@ -602,7 +602,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
get_committee_count_per_slot(epochRef), request.slot,
request.committee_index)
node.registerDuty(
node.actionTracker.registerDuty(
request.slot, subnet_id, request.validator_index,
request.is_aggregator)
@ -728,7 +728,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
block:
var res: seq[Future[SendResult]]
for proof in proofs:
res.add(node.sendSyncCommitteeContribution(proof, true))
res.add(node.router.routeSignedContributionAndProof(proof, true))
res
let failures =

View File

@ -631,7 +631,7 @@ func shortLog*(v: SyncCommitteeContribution): auto =
(
slot: shortLog(v.slot),
beacon_block_root: shortLog(v.beacon_block_root),
subnetId: v.subcommittee_index,
subcommittee_index: v.subcommittee_index,
aggregation_bits: $v.aggregation_bits
)

View File

@ -0,0 +1,500 @@
# beacon_chain
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
std/sequtils,
chronicles,
metrics,
../spec/network,
../consensus_object_pools/spec_cache,
../gossip_processing/eth2_processor,
../networking/eth2_network
export eth2_processor, eth2_network
# The "sent" counters capture messages that were sent via this beacon node
# regardless if they were produced internally or received via the REST API.
#
# Counters and histograms for timing-sensitive messages, only counters for
# the rest (aggregates don't affect rewards, so timing is less important)
const delayBuckets = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf]
declareCounter beacon_blocks_sent,
"Number of beacon blocks sent by this node"
declareHistogram beacon_blocks_sent_delay,
"Time(s) between expected and actual block send moment",
buckets = delayBuckets
declareCounter beacon_attestations_sent,
"Number of attestations sent by the node"
declareCounter beacon_aggregates_sent,
"Number of beacon chain attestations sent by the node"
declareHistogram beacon_attestation_sent_delay,
"Time(s) between expected and actual attestation send moment",
buckets = delayBuckets
declareCounter beacon_sync_committee_messages_sent,
"Number of sync committee messages sent by the node"
declareHistogram beacon_sync_committee_message_sent_delay,
"Time(s) between expected and actual sync committee message send moment",
buckets = delayBuckets
declareCounter beacon_sync_committee_contributions_sent,
"Number of sync committee contributions sent by the node"
declareCounter beacon_voluntary_exits_sent,
"Number of beacon voluntary sent by this node"
declareCounter beacon_attester_slashings_sent,
"Number of beacon attester slashings sent by this node"
declareCounter beacon_proposer_slashings_sent,
"Number of beacon proposer slashings sent by this node"
type
MessageRouter* = object
## The message router is responsible for routing messages produced by
## attached validators or received via REST.
##
## Message routing does 3 things:
##
## * perform a "quick" sanity check of the message similar to gossip
## processing - regardless where the message comes from, this check is
## done so as to protect the internal state of the beacon node
## * broadcast the message to the network - in general, the aim is to start
## the broadcasting as soon as possible without risking that the node
## gets descored
## * update the internal state of the beacon node with the data in the
## message - for example add a block to the dag or an attestation to the
## attestation pool and fork choice - as a consequence, the message will
## also be published to event subscribers
##
## Because the message router produces messages that will be gossiped, we
## run the messages through the same validation as incoming gossip messages.
##
## In most cases, processing of valid messages is identical to that done
## for gossip - blocks in particular however skip the queue.
processor*: ref Eth2Processor
network*: Eth2Node
# TODO this belongs somewhere else, ie sync committee pool
onSyncCommitteeMessage*: proc(slot: Slot) {.gcsafe, raises: [Defect].}
func isGoodForSending(validationResult: ValidationRes): bool =
# When routing messages from REST, it's possible that these have already
# been received via gossip (because they might have been sent to multiple
# beacon nodes, as is the case with Vouch) - thus, we treat `IGNORE`
# as success as far as further processing goes. `libp2p` however will not
# re-broadcast the message as it already exists in its cache.
validationResult.isOk() or
validationResult.error[0] == ValidationResult.Ignore
template dag(router: MessageRouter): ChainDAGRef =
router.processor[].dag
template quarantine(router: MessageRouter): ref Quarantine =
router.processor[].quarantine
template blockProcessor(router: MessageRouter): ref BlockProcessor =
router.processor[].blockProcessor
template getCurrentBeaconTime(router: MessageRouter): BeaconTime =
router.processor[].getCurrentBeaconTime()
type RouteBlockResult* = Result[Opt[BlockRef], cstring]
proc routeSignedBeaconBlock*(
router: ref MessageRouter, blck: ForkySignedBeaconBlock):
Future[RouteBlockResult] {.async.} =
## Validate and broadcast beacon block, then add it to the block database
## Returns the new Head when block is added successfully to dag, none when
## block passes validation but is not added, and error otherwise
let
wallTime = router[].getCurrentBeaconTime()
# Start with a quick gossip validation check such that broadcasting the
# block doesn't get the node into trouble
block:
let res = validateBeaconBlock(
router[].dag, router[].quarantine, blck, wallTime, {})
if not res.isGoodForSending():
warn "Block failed validation",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), error = res.error()
return err(res.error()[1])
let
sendTime = router[].getCurrentBeaconTime()
delay = sendTime - blck.message.slot.block_deadline()
# The block passed basic gossip validation - we can "safely" broadcast it now.
# In fact, per the spec, we should broadcast it even if it later fails to
# apply to our state.
block:
let res = await router[].network.broadcastBeaconBlock(blck)
if res.isErr:
notice "Block not sent",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), error = res.error()
return err(res.error())
let newBlockRef = await router[].blockProcessor.storeBlock(
MsgSource.api, sendTime, blck, true)
# The boolean we return tells the caller whether the block was integrated
# into the chain
if newBlockRef.isErr():
warn "Unable to add routed block to block pool",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), err = newBlockRef.error()
return ok(Opt.none(BlockRef))
beacon_blocks_sent.inc()
beacon_blocks_sent_delay.observe(delay.toFloatSeconds())
notice "Block sent",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), delay
return ok(Opt.some(newBlockRef.get()))
proc routeAttestation*(
router: ref MessageRouter, attestation: Attestation,
subnet_id: SubnetId, checkSignature: bool): Future[SendResult] {.async.} =
## Process and broadcast attestation - processing will register the it with
## the attestation pool
block:
let res = await router[].processor.processAttestation(
MsgSource.api, attestation, subnet_id, checkSignature)
if not res.isGoodForSending:
warn "Attestation failed validation",
attestation = shortLog(attestation), error = res.error()
return err(res.error()[1])
let
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - attestation.data.slot.attestation_deadline()
block:
let res = await router[].network.broadcastAttestation(
subnet_id, attestation)
if not res.isOk:
notice "Attestation not sent",
attestation = shortLog(attestation), error = res.error()
return err(res.error())
beacon_attestations_sent.inc()
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
notice "Attestation sent",
attestation = shortLog(attestation), delay, subnet_id
return ok()
proc routeAttestation*(
router: ref MessageRouter, attestation: Attestation):
Future[SendResult] {.async.} =
# Compute subnet, then route attestation
let
target = router[].dag.getBlockRef(attestation.data.target.root).valueOr:
notice "Attempt to send attestation for unknown target",
attestation = shortLog(attestation)
return err(
"Attempt to send attestation for unknown target")
epochRef = router[].dag.getEpochRef(
target, attestation.data.target.epoch, false).valueOr:
warn "Cannot construct EpochRef for attestation, skipping send - report bug",
target = shortLog(target),
attestation = shortLog(attestation)
return
committee_index =
epochRef.get_committee_index(attestation.data.index).valueOr:
notice "Invalid committee index in attestation",
attestation = shortLog(attestation)
return err("Invalid committee index in attestation")
subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), attestation.data.slot,
committee_index)
return await router.routeAttestation(
attestation, subnet_id, checkSignature = true)
proc routeSignedAggregateAndProof*(
router: ref MessageRouter, proof: SignedAggregateAndProof,
checkSignature = true):
Future[SendResult] {.async.} =
## Validate and broadcast aggregate
block:
# Because the aggregate was (most likely) produced by this beacon node,
# we already know all attestations in it - we skip the coverage check so
# that all processing happens anyway
let res = await router[].processor.processSignedAggregateAndProof(
MsgSource.api, proof, checkSignature = checkSignature,
checkCover = false)
if not res.isGoodForSending:
warn "Aggregated attestation failed validation",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
signature = shortLog(proof.signature), error = res.error()
return err(res.error()[1])
let
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - proof.message.aggregate.data.slot.aggregate_deadline()
block:
let res = await router[].network.broadcastAggregateAndProof(proof)
if not res.isOk():
notice "Aggregated attestation not sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
signature = shortLog(proof.signature), error = res.error()
return err(res.error())
beacon_aggregates_sent.inc()
notice "Aggregated attestation sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
selection_proof = shortLog(proof.message.selection_proof),
signature = shortLog(proof.signature), delay
return ok()
proc routeSyncCommitteeMessage*(
router: ref MessageRouter, msg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex,
checkSignature: bool): Future[SendResult] {.async.} =
block:
let res = await router[].processor.processSyncCommitteeMessage(
MsgSource.api, msg, subcommitteeIdx, checkSignature)
if not res.isGoodForSending:
warn "Sync committee message failed validation",
message = shortLog(msg), error = res.error()
return err(res.error()[1])
let
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - msg.slot.sync_committee_message_deadline()
block:
let res = await router[].network.broadcastSyncCommitteeMessage(
msg, subcommitteeIdx)
if not res.isOk():
notice "Sync committee message not sent",
message = shortLog(msg), error = res.error()
return err(res.error())
beacon_sync_committee_messages_sent.inc()
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
notice "Sync committee message sent", message = shortLog(msg), delay
if router[].onSyncCommitteeMessage != nil:
router[].onSyncCommitteeMessage(msg.slot)
return ok()
proc routeSyncCommitteeMessages*(
router: ref MessageRouter, msgs: seq[SyncCommitteeMessage]):
Future[seq[SendResult]] {.async.} =
return withState(router[].dag.headState):
when stateFork >= BeaconStateFork.Altair:
var statuses = newSeq[Option[SendResult]](len(msgs))
let
curPeriod = sync_committee_period(state.data.slot)
nextPeriod = curPeriod + 1
let (keysCur, keysNxt) =
block:
var resCur: Table[uint64, int]
var resNxt: Table[uint64, int]
for index, msg in msgs:
if msg.validator_index < lenu64(state.data.validators):
let msgPeriod = sync_committee_period(msg.slot + 1)
if msgPeriod == curPeriod:
resCur[msg.validator_index] = index
elif msgPeriod == nextPeriod:
resNxt[msg.validator_index] = index
else:
statuses[index] =
some(SendResult.err("Message's slot out of state's head range"))
else:
statuses[index] = some(SendResult.err("Incorrect validator's index"))
if (len(resCur) == 0) and (len(resNxt) == 0):
return statuses.mapIt(it.get())
(resCur, resNxt)
let (pending, indices) = block:
var resFutures: seq[Future[SendResult]]
var resIndices: seq[int]
template headSyncCommittees(): auto = router[].dag.headSyncCommittees
for subcommitteeIdx in SyncSubcommitteeIndex:
for valKey in syncSubcommittee(
headSyncCommittees.current_sync_committee, subcommitteeIdx):
let index = keysCur.getOrDefault(uint64(valKey), -1)
if index >= 0:
resIndices.add(index)
resFutures.add(router.routeSyncCommitteeMessage(
msgs[index], subcommitteeIdx, true))
for subcommitteeIdx in SyncSubcommitteeIndex:
for valKey in syncSubcommittee(
headSyncCommittees.next_sync_committee, subcommitteeIdx):
let index = keysNxt.getOrDefault(uint64(valKey), -1)
if index >= 0:
resIndices.add(index)
resFutures.add(router.routeSyncCommitteeMessage(
msgs[index], subcommitteeIdx, true))
(resFutures, resIndices)
await allFutures(pending)
for index, future in pending:
if future.done():
let fres = future.read()
if fres.isErr():
statuses[indices[index]] = some(SendResult.err(fres.error()))
else:
statuses[indices[index]] = some(SendResult.ok())
elif future.failed() or future.cancelled():
let exc = future.readError()
debug "Unexpected failure while sending committee message",
message = msgs[indices[index]], error = $exc.msg
statuses[indices[index]] = some(SendResult.err(
"Unexpected failure while sending committee message"))
var res: seq[SendResult]
for item in statuses:
if item.isSome():
res.add(item.get())
else:
res.add(SendResult.err("Message validator not in sync committee"))
res
else:
var res: seq[SendResult]
for _ in msgs:
res.add(SendResult.err("Waiting for altair fork"))
res
proc routeSignedContributionAndProof*(
router: ref MessageRouter,
msg: SignedContributionAndProof,
checkSignature: bool): Future[SendResult] {.async.} =
block:
let res = await router[].processor.processSignedContributionAndProof(
MsgSource.api, msg)
if not res.isGoodForSending:
warn "Contribution failed validation",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), error = res.error()
return err(res.error()[1])
let
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - msg.message.contribution.slot.sync_contribution_deadline()
block:
let res = await router[].network.broadcastSignedContributionAndProof(msg)
if not res.isOk():
notice "Contribution not sent",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), error = res.error()
return err(res.error())
beacon_sync_committee_contributions_sent.inc()
notice "Contribution sent",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), delay
return ok()
proc routeSignedVoluntaryExit*(
router: ref MessageRouter, exit: SignedVoluntaryExit):
Future[SendResult] {.async.} =
block:
let res =
router[].processor[].processSignedVoluntaryExit(MsgSource.api, exit)
if not res.isGoodForSending:
warn "Voluntary exit failed validation",
exit = shortLog(exit), error = res.error()
return err(res.error()[1])
block:
let res = await router[].network.broadcastVoluntaryExit(exit)
if not res.isOk():
notice "Voluntary exit not sent",
exit = shortLog(exit), error = res.error()
return err(res.error())
beacon_voluntary_exits_sent.inc()
notice "Voluntary exit sent", exit = shortLog(exit)
return ok()
proc routeAttesterSlashing*(
router: ref MessageRouter, slashing: AttesterSlashing):
Future[SendResult] {.async.} =
block:
let res =
router[].processor[].processAttesterSlashing(MsgSource.api, slashing)
if not res.isGoodForSending:
warn "Attester slashing failed validation",
slashing = shortLog(slashing), error = res.error()
return err(res.error()[1])
block:
let res = await router[].network.broadcastAttesterSlashing(slashing)
if not res.isOk():
notice "Attester slashing not sent",
slashing = shortLog(slashing), error = res.error()
return err(res.error())
beacon_attester_slashings_sent.inc()
notice "Attester slashing sent", slashing = shortLog(slashing)
return ok()
proc routeProposerSlashing*(
router: ref MessageRouter, slashing: ProposerSlashing):
Future[SendResult] {.async.} =
block:
let res =
router[].processor[].processProposerSlashing(MsgSource.api, slashing)
if not res.isGoodForSending:
warn "Proposer slashing request failed validation",
slashing = shortLog(slashing), error = res.error()
return err(res.error()[1])
block:
let res = await router[].network.broadcastProposerSlashing(slashing)
if not res.isOk():
notice "Proposer slashing not sent",
slashing = shortLog(slashing), error = res.error()
return err(res.error())
beacon_proposer_slashings_sent.inc()
notice "Proposer slashing sent", slashing = shortLog(slashing)
return ok()

View File

@ -7,13 +7,17 @@
{.push raises: [Defect].}
# This module is responsible for handling beacon node validators, ie those that
# that are running directly in the beacon node and not in a separate validator
# client process
# References to `vFuture` refer to the pre-release proposal of the libp2p based
# light client sync protocol. Conflicting release versions are not in use.
# https://github.com/ethereum/consensus-specs/pull/2802
import
# Standard library
std/[os, sequtils, tables],
std/[os, tables],
# Nimble packages
stew/[byteutils, objects],
@ -47,23 +51,6 @@ from web3/engine_api_types import PayloadExecutionStatus
const delayBuckets = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf]
declareCounter beacon_attestations_sent,
"Number of beacon chain attestations sent by this peer"
declareHistogram beacon_attestation_sent_delay,
"Time(s) between slot start and attestation sent moment",
buckets = delayBuckets
declareCounter beacon_sync_committee_messages_sent,
"Number of sync committee messages sent by this peer"
declareCounter beacon_sync_committee_contributions_sent,
"Number of sync committee contributions sent by this peer"
declareHistogram beacon_sync_committee_message_sent_delay,
"Time(s) between slot start and sync committee message sent moment",
buckets = delayBuckets
declareCounter beacon_light_client_finality_updates_sent,
"Number of LC finality updates sent by this peer"
@ -83,7 +70,6 @@ declarePublicGauge(attached_validator_balance_total,
logScope: topics = "beacval"
type
SendBlockResult* = Result[bool, cstring]
ForkedBlockResult* = Result[ForkedBeaconBlock, string]
proc findValidator(validators: auto, pubkey: ValidatorPubKey):
@ -213,41 +199,7 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool =
else:
not node.dag.is_optimistic(head.root)
func isGoodForSending(validationResult: ValidationRes): bool =
# Validator clients such as Vouch can be configured to work with multiple
# beacon nodes simultaneously. In this configuration, the validator client
# will try to broadcast the gossip messages through each of the connected
# beacon nodes which may lead to a situation where some of the nodes see a
# message arriving from the network before it arrives through the REST API.
# This should not be considered an error and the beacon node should still
# broadcast the message as the intented purpose of the Vouch strategy is
# to ensure that the message will reach as many peers as possible.
validationResult.isOk() or validationResult.error[0] == ValidationResult.Ignore
proc sendAttestation(
node: BeaconNode, attestation: Attestation,
subnet_id: SubnetId, checkSignature: bool): Future[SendResult] {.async.} =
# Validate attestation before sending it via gossip - validation will also
# register the attestation with the attestation pool. Notably, although
# libp2p calls the data handler for any subscription on the subnet
# topic, it does not perform validation.
let res = await node.processor.attestationValidator(
MsgSource.api, attestation, subnet_id, checkSignature)
return
if res.isGoodForSending:
let sendResult =
await node.network.broadcastAttestation(subnet_id, attestation)
if sendResult.isOk:
beacon_attestations_sent.inc()
sendResult
else:
notice "Produced attestation failed validation",
attestation = shortLog(attestation),
error = res.error()
err(res.error()[1])
proc handleLightClientUpdates(node: BeaconNode, slot: Slot) {.async.} =
proc handleLightClientUpdates*(node: BeaconNode, slot: Slot) {.async.} =
static: doAssert lightClientFinalityUpdateSlotOffset ==
lightClientOptimisticUpdateSlotOffset
let sendTime = node.beaconClock.fromNow(
@ -298,146 +250,6 @@ proc handleLightClientUpdates(node: BeaconNode, slot: Slot) {.async.} =
warn "LC optimistic update failed to send",
error = sendResult.error()
proc scheduleSendingLightClientUpdates(node: BeaconNode, slot: Slot) =
if not node.config.lightClientDataServe.get:
return
if node.lightClientPool[].broadcastGossipFut != nil:
return
if slot <= node.lightClientPool[].latestBroadcastedSlot:
return
node.lightClientPool[].latestBroadcastedSlot = slot
template fut(): auto = node.lightClientPool[].broadcastGossipFut
fut = node.handleLightClientUpdates(slot)
fut.addCallback do (p: pointer) {.gcsafe.}:
fut = nil
proc sendSyncCommitteeMessage(
node: BeaconNode, msg: SyncCommitteeMessage,
subcommitteeIdx: SyncSubcommitteeIndex,
checkSignature: bool): Future[SendResult] {.async.} =
# Validate sync committee message before sending it via gossip
# validation will also register the message with the sync committee
# message pool. Notably, although libp2p calls the data handler for
# any subscription on the subnet topic, it does not perform validation.
let res = await node.processor.syncCommitteeMessageValidator(
MsgSource.api, msg, subcommitteeIdx, checkSignature)
return
if res.isGoodForSending:
let sendResult =
await node.network.broadcastSyncCommitteeMessage(msg, subcommitteeIdx)
if sendResult.isOk:
beacon_sync_committee_messages_sent.inc()
node.scheduleSendingLightClientUpdates(msg.slot)
sendResult
else:
notice "Sync committee message failed validation",
msg, error = res.error()
SendResult.err(res.error()[1])
proc sendSyncCommitteeMessages*(node: BeaconNode,
msgs: seq[SyncCommitteeMessage]
): Future[seq[SendResult]] {.async.} =
return withState(node.dag.headState):
when stateFork >= BeaconStateFork.Altair:
var statuses = newSeq[Option[SendResult]](len(msgs))
let
curPeriod = sync_committee_period(state.data.slot)
nextPeriod = curPeriod + 1
let (keysCur, keysNxt) =
block:
var resCur: Table[uint64, int]
var resNxt: Table[uint64, int]
for index, msg in msgs:
if msg.validator_index < lenu64(state.data.validators):
let msgPeriod = sync_committee_period(msg.slot + 1)
if msgPeriod == curPeriod:
resCur[msg.validator_index] = index
elif msgPeriod == nextPeriod:
resNxt[msg.validator_index] = index
else:
statuses[index] =
some(SendResult.err("Message's slot out of state's head range"))
else:
statuses[index] = some(SendResult.err("Incorrect validator's index"))
if (len(resCur) == 0) and (len(resNxt) == 0):
return statuses.mapIt(it.get())
(resCur, resNxt)
let (pending, indices) = block:
var resFutures: seq[Future[SendResult]]
var resIndices: seq[int]
template headSyncCommittees(): auto = node.dag.headSyncCommittees
for subcommitteeIdx in SyncSubcommitteeIndex:
for valKey in syncSubcommittee(
headSyncCommittees.current_sync_committee, subcommitteeIdx):
let index = keysCur.getOrDefault(uint64(valKey), -1)
if index >= 0:
resIndices.add(index)
resFutures.add(node.sendSyncCommitteeMessage(
msgs[index], subcommitteeIdx, true))
for subcommitteeIdx in SyncSubcommitteeIndex:
for valKey in syncSubcommittee(
headSyncCommittees.next_sync_committee, subcommitteeIdx):
let index = keysNxt.getOrDefault(uint64(valKey), -1)
if index >= 0:
resIndices.add(index)
resFutures.add(node.sendSyncCommitteeMessage(
msgs[index], subcommitteeIdx, true))
(resFutures, resIndices)
await allFutures(pending)
for index, future in pending:
if future.done():
let fres = future.read()
if fres.isErr():
statuses[indices[index]] = some(SendResult.err(fres.error()))
else:
statuses[indices[index]] = some(SendResult.ok())
elif future.failed() or future.cancelled():
let exc = future.readError()
debug "Unexpected failure while sending committee message",
message = msgs[indices[index]], error = $exc.msg
statuses[indices[index]] = some(SendResult.err(
"Unexpected failure while sending committee message"))
var res: seq[SendResult]
for item in statuses:
if item.isSome():
res.add(item.get())
else:
res.add(SendResult.err("Message validator not in sync committee"))
res
else:
var res: seq[SendResult]
for _ in msgs:
res.add(SendResult.err("Waiting for altair fork"))
res
proc sendSyncCommitteeContribution*(
node: BeaconNode,
msg: SignedContributionAndProof,
checkSignature: bool): Future[SendResult] {.async.} =
let res = await node.processor.contributionValidator(
MsgSource.api, msg, checkSignature)
return
if res.isGoodForSending:
let sendResult =
await node.network.broadcastSignedContributionAndProof(msg)
if sendResult.isOk:
beacon_sync_committee_contributions_sent.inc()
sendResult
else:
notice "Sync committee contribution failed validation",
msg, error = res.error()
err(res.error()[1])
proc createAndSendAttestation(node: BeaconNode,
fork: Fork,
genesis_validators_root: Eth2Digest,
@ -461,27 +273,14 @@ proc createAndSendAttestation(node: BeaconNode,
[uint64 indexInCommittee], committeeLen, data, signature).expect(
"valid data")
let res = await node.sendAttestation(
# Logged in the router
let res = await node.router.routeAttestation(
attestation, subnet_id, checkSignature = false)
if not res.isOk():
warn "Attestation failed",
validator = shortLog(validator),
attestation = shortLog(attestation),
error = res.error()
return
if node.config.dumpEnabled:
dump(node.config.dumpDirOutgoing, attestation.data, validator.pubkey)
let
wallTime = node.beaconClock.now()
delay = wallTime - data.slot.attestation_deadline()
notice "Attestation sent",
attestation = shortLog(attestation), validator = shortLog(validator),
delay, subnet_id
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
@ -740,8 +539,8 @@ proc proposeBlock(node: BeaconNode,
return head
res.get()
var newBlock = await makeBeaconBlockForHeadAndSlot(
node, randao, validator_index, node.graffitiBytes, head, slot)
newBlock = await makeBeaconBlockForHeadAndSlot(
node, randao, validator_index, node.graffitiBytes, head, slot)
if newBlock.isErr():
return head # already logged elsewhere!
@ -787,36 +586,12 @@ proc proposeBlock(node: BeaconNode,
message: blck, signature: signature, root: blockRoot)
else:
static: doAssert "Unknown SignedBeaconBlock type"
newBlockRef =
(await node.router.routeSignedBeaconBlock(signedBlock)).valueOr:
return head # Errors logged in router
# We produced the block using a state transition, meaning the block is valid
# enough that it will not be rejected by gossip - it is unlikely but
# possible that it will be ignored due to extreme timing conditions, for
# example a delay in signing.
# We'll start broadcasting it before integrating fully in the chaindag
# so that it can start propagating through the network ASAP.
let sendResult = await node.network.broadcastBeaconBlock(signedBlock)
if sendResult.isErr:
warn "Block failed to send",
blockRoot = shortLog(blockRoot), blck = shortLog(blck),
signature = shortLog(signature), validator = shortLog(validator),
error = sendResult.error()
return head
let
wallTime = node.beaconClock.now()
# storeBlock puts the block in the chaindag, and if accepted, takes care
# of side effects such as event api notification
newBlockRef = await node.blockProcessor.storeBlock(
MsgSource.api, wallTime, signedBlock, true)
if newBlockRef.isErr:
warn "Unable to add proposed block to block pool",
blockRoot = shortLog(blockRoot), blck = shortLog(blck),
signature = shortLog(signature), validator = shortLog(validator)
return head
if newBlockRef.isNone():
return head # Validation errors logged in router
notice "Block proposed",
blockRoot = shortLog(blockRoot), blck = shortLog(blck),
@ -869,13 +644,11 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
# using empty slots as fillers.
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#validator-assignments
let
epochRef = block:
let tmp = node.dag.getEpochRef(attestationHead.blck, slot.epoch, false)
if isErr(tmp):
epochRef = node.dag.getEpochRef(
attestationHead.blck, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for attestation head, report bug",
attestationHead = shortLog(attestationHead), slot
return
tmp.get()
committees_per_slot = get_committee_count_per_slot(epochRef)
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
@ -913,8 +686,8 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
badVoteDetails = $registered.error()
proc createAndSendSyncCommitteeMessage(node: BeaconNode,
slot: Slot,
validator: AttachedValidator,
slot: Slot,
subcommitteeIdx: SyncSubcommitteeIndex,
head: BlockRef) {.async.} =
try:
@ -932,26 +705,15 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode,
return
res.get()
let res = await node.sendSyncCommitteeMessage(
# Logged in the router
let res = await node.router.routeSyncCommitteeMessage(
msg, subcommitteeIdx, checkSignature = false)
if res.isErr():
warn "Sync committee message failed",
error = res.error()
if not res.isOk():
return
if node.config.dumpEnabled:
dump(node.config.dumpDirOutgoing, msg, validator.pubkey)
let
wallTime = node.beaconClock.now()
delay = wallTime - msg.slot.sync_committee_message_deadline()
notice "Sync committee message sent",
message = shortLog(msg),
validator = shortLog(validator),
delay
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
@ -959,122 +721,93 @@ proc createAndSendSyncCommitteeMessage(node: BeaconNode,
proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
# TODO Use a view type to avoid the copy
var syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
var
syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for head, report bug",
attestationHead = shortLog(head), slot
return
for subcommitteeIdx in SyncSubcommitteeIndex:
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getAttachedValidator(
getStateField(node.dag.headState, validators), valIdx)
let validator = node.getAttachedValidator(epochRef, valIdx)
if isNil(validator) or validator.index.isNone():
continue
asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator,
asyncSpawn createAndSendSyncCommitteeMessage(node, validator, slot,
subcommitteeIdx, head)
proc signAndSendContribution(node: BeaconNode,
validator: AttachedValidator,
contribution: SyncCommitteeContribution,
selectionProof: ValidatorSig) {.async.} =
subcommitteeIdx: SyncSubcommitteeIndex,
head: BlockRef,
slot: Slot) {.async.} =
try:
let
fork = node.dag.forkAtEpoch(contribution.slot.epoch)
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
msg = (ref SignedContributionAndProof)(
selectionProof = block:
let res = await validator.getSyncCommitteeSelectionProof(
fork, genesis_validators_root, slot, subcommitteeIdx)
if res.isErr():
warn "Unable to generate committee selection proof",
validator = shortLog(validator), slot,
subnet_id = subcommitteeIdx, error = res.error()
return
res.get()
if not is_sync_committee_aggregator(selectionProof):
return
var
msg = SignedContributionAndProof(
message: ContributionAndProof(
aggregator_index: uint64 validator.index.get,
contribution: contribution,
selection_proof: selectionProof))
msg[].signature = block:
if not node.syncCommitteeMsgPool[].produceContribution(
slot,
head.root,
subcommitteeIdx,
msg.message.contribution):
return
msg.signature = block:
let res = await validator.getContributionAndProofSignature(
fork, genesis_validators_root, msg[].message)
fork, genesis_validators_root, msg.message)
if res.isErr():
warn "Unable to sign sync committee contribution",
validator = shortLog(validator), error_msg = res.error()
validator = shortLog(validator), message = shortLog(msg.message),
error_msg = res.error()
return
res.get()
# Failures logged in sendSyncCommitteeContribution
discard await node.sendSyncCommitteeContribution(msg[], false)
notice "Contribution sent", contribution = shortLog(msg[])
# Logged in the router
discard await node.router.routeSignedContributionAndProof(msg, false)
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
warn "Error sending sync committee contribution", err = exc.msg
proc handleSyncCommitteeContributions(node: BeaconNode,
head: BlockRef, slot: Slot) {.async.} =
# TODO Use a view type to avoid the copy
proc handleSyncCommitteeContributions(
node: BeaconNode, head: BlockRef, slot: Slot) {.async.} =
let
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
syncCommittee = node.dag.syncCommitteeParticipants(slot + 1)
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for head, report bug",
attestationHead = shortLog(head), slot
return
type
AggregatorCandidate = object
validator: AttachedValidator
subcommitteeIdx: SyncSubcommitteeIndex
var candidateAggregators: seq[AggregatorCandidate]
var selectionProofs: seq[Future[SignatureResult]]
var time = timeIt:
for subcommitteeIdx in SyncSubcommitteeIndex:
# TODO Hoist outside of the loop with a view type
# to avoid the repeated offset calculations
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getAttachedValidator(
getStateField(node.dag.headState, validators), valIdx)
if validator == nil:
continue
candidateAggregators.add AggregatorCandidate(
validator: validator,
subcommitteeIdx: subcommitteeIdx)
selectionProofs.add validator.getSyncCommitteeSelectionProof(
fork, genesis_validators_root, slot, subcommitteeIdx)
await allFutures(selectionProofs)
debug "Prepared contributions selection proofs",
count = selectionProofs.len, time
var contributionsSent = 0
time = timeIt:
for i, proof in selectionProofs:
if not proof.completed:
for subcommitteeIdx in SyncSubCommitteeIndex:
for valIdx in syncSubcommittee(syncCommittee, subcommitteeIdx):
let validator = node.getAttachedValidator(epochRef, valIdx)
if validator == nil:
continue
let selectionProofRes = proof.read()
if selectionProofRes.isErr():
warn "Unable to generate selection proof",
validator = shortLog(candidateAggregators[i].validator),
slot, head, subnet_id = candidateAggregators[i].subcommitteeIdx
continue
let selectionProof = selectionProofRes.get()
if not is_sync_committee_aggregator(selectionProof):
continue
var contribution: SyncCommitteeContribution
let contributionWasProduced =
node.syncCommitteeMsgPool[].produceContribution(
slot,
head.root,
candidateAggregators[i].subcommitteeIdx,
contribution)
if contributionWasProduced:
asyncSpawn signAndSendContribution(
node,
candidateAggregators[i].validator,
contribution,
selectionProof)
inc contributionsSent
else:
debug "Failure to produce contribution",
slot, head, subnet_id = candidateAggregators[i].subcommitteeIdx
asyncSpawn signAndSendContribution(
node, validator, subcommitteeIdx, head, slot)
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
@ -1102,29 +835,57 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
else:
await proposeBlock(node, validator, proposer.get(), head, slot)
proc makeAggregateAndProof(
pool: var AttestationPool, epochRef: EpochRef, slot: Slot,
committee_index: CommitteeIndex,
validator_index: ValidatorIndex,
slot_signature: ValidatorSig): Opt[AggregateAndProof] =
doAssert validator_index in get_beacon_committee(epochRef, slot, committee_index)
proc signAndSendAggregate(
node: BeaconNode, validator: AttachedValidator, epochRef: EpochRef,
slot: Slot, committee_index: CommitteeIndex) {.async.} =
try:
let
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
validator_index = validator.index.get()
selectionProof = block:
let res = await validator.getSlotSignature(
fork, genesis_validators_root, slot)
if res.isErr():
warn "Unable to create slot signature",
validator = shortLog(validator),
slot, error = res.error()
return
res.get()
# TODO for testing purposes, refactor this into the condition check
# and just calculation
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#aggregation-selection
if not is_aggregator(epochRef, slot, committee_index, slot_signature):
return err()
if not is_aggregator(epochRef, slot, committee_index, selectionProof):
return
let maybe_slot_attestation = getAggregatedAttestation(pool, slot, committee_index)
if maybe_slot_attestation.isNone:
return err()
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#construct-aggregate
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#aggregateandproof
var
msg = SignedAggregateAndProof(
message: AggregateAndProof(
aggregator_index: uint64 validator_index,
selection_proof: selectionProof))
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#construct-aggregate
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#aggregateandproof
ok(AggregateAndProof(
aggregator_index: validator_index.uint64,
aggregate: maybe_slot_attestation.get,
selection_proof: slot_signature))
msg.message.aggregate = node.attestationPool[].getAggregatedAttestation(
slot, committee_index).valueOr:
return
msg.signature = block:
let res = await validator.getAggregateAndProofSignature(
fork, genesis_validators_root, msg.message)
if res.isErr():
warn "Unable to sign aggregate",
validator = shortLog(validator), error_msg = res.error()
return
res.get()
# Logged in the router
discard await node.router.routeSignedAggregateAndProof(
msg, checkSignature = false)
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
warn "Error sending aggregate", err = exc.msg
proc sendAggregatedAttestations(
node: BeaconNode, head: BlockRef, slot: Slot) {.async.} =
@ -1132,87 +893,19 @@ proc sendAggregatedAttestations(
# the given slot, for which `is_aggregator` returns `true.
let
epochRef = block:
let tmp = node.dag.getEpochRef(head, slot.epoch, false)
if isErr(tmp): # Some unusual race condition perhaps?
warn "Cannot construct EpochRef for head, report bug",
head = shortLog(head), slot
return
tmp.get()
fork = node.dag.forkAtEpoch(slot.epoch)
genesis_validators_root = node.dag.genesis_validators_root
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for head, report bug",
head = shortLog(head), slot
return
committees_per_slot = get_committee_count_per_slot(epochRef)
var
slotSigs: seq[Future[SignatureResult]] = @[]
slotSigsData: seq[tuple[committee_index: CommitteeIndex,
validator_index: ValidatorIndex,
v: AttachedValidator]] = @[]
for committee_index in get_committee_indices(committees_per_slot):
let committee = get_beacon_committee(epochRef, slot, committee_index)
for index_in_committee, validator_index in committee:
for _, validator_index in
get_beacon_committee(epochRef, slot, committee_index):
let validator = node.getAttachedValidator(epochRef, validator_index)
if validator != nil:
# the validator index and private key pair.
slotSigs.add validator.getSlotSignature(
fork, genesis_validators_root, slot)
slotSigsData.add (committee_index, validator_index, validator)
await allFutures(slotSigs)
doAssert slotSigsData.len == slotSigs.len
for i in 0..<slotSigs.len:
let
data = slotSigsData[i]
slotSig = slotSigs[i].read().valueOr:
warn "Unable to create slot signature",
validator = shortLog(data.v),
slot, error = error
continue
aggregateAndProof = makeAggregateAndProof(
node.attestationPool[], epochRef, slot, data.committee_index,
data.validator_index, slotSig).valueOr:
# Don't broadcast when, e.g., this validator isn't aggregator
continue
sig = block:
let res = await getAggregateAndProofSignature(data.v,
fork, genesis_validators_root, aggregateAndProof)
if res.isErr():
warn "Unable to sign aggregate",
validator = shortLog(data.v), error_msg = res.error()
return
res.get()
signedAP = SignedAggregateAndProof(
message: aggregateAndProof,
signature: sig)
let sendResult = await node.network.broadcastAggregateAndProof(signedAP)
if sendResult.isErr:
warn "Aggregated attestation failed to send",
error = sendResult.error()
return
# The subnet on which the attestations (should have) arrived
let
subnet_id = compute_subnet_for_attestation(
committees_per_slot, slot, data.committee_index)
notice "Aggregated attestation sent",
aggregate = shortLog(signedAP.message.aggregate),
aggregator_index = signedAP.message.aggregator_index,
signature = shortLog(signedAP.signature),
validator = shortLog(data.v),
subnet_id
node.validatorMonitor[].registerAggregate(
MsgSource.api, node.beaconClock.now(), signedAP.message,
get_attesting_indices(
epochRef, slot,
data.committee_index,
aggregateAndProof.aggregate.aggregation_bits))
asyncSpawn signAndSendAggregate(
node, validator, epochRef, slot, committee_index)
proc updateValidatorMetrics*(node: BeaconNode) =
# Technically, this only needs to be done on epoch transitions and if there's
@ -1369,174 +1062,26 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
updateValidatorMetrics(node) # the important stuff is done, update the vanity numbers
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/phase0/validator.md#broadcast-aggregate
# If the validator is selected to aggregate (`is_aggregator`), then they
# broadcast their best aggregate as a `SignedAggregateAndProof` to the global
# aggregate channel (`beacon_aggregate_and_proof`) `2 / INTERVALS_PER_SLOT`
# of the way through the `slot`-that is,
# `SECONDS_PER_SLOT * 2 / INTERVALS_PER_SLOT` seconds after the start of `slot`.
if slot > 2:
doAssert slot.aggregate_deadline() == slot.sync_contribution_deadline()
let
aggregateCutoff = node.beaconClock.fromNow(slot.aggregate_deadline())
if aggregateCutoff.inFuture:
debug "Waiting to send aggregate attestations",
aggregateCutoff = shortLog(aggregateCutoff.offset)
await sleepAsync(aggregateCutoff.offset)
let sendAggregatedAttestationsFut =
sendAggregatedAttestations(node, head, slot)
let handleSyncCommitteeContributionsFut =
handleSyncCommitteeContributions(node, head, slot)
await handleSyncCommitteeContributionsFut
await sendAggregatedAttestationsFut
proc sendAttestation*(node: BeaconNode,
attestation: Attestation): Future[SendResult] {.async.} =
# REST helper procedure.
# https://github.com/ethereum/consensus-specs/blob/v1.2.0-rc.1/specs/altair/validator.md#broadcast-sync-committee-contribution
# Wait 2 / 3 of the slot time to allow messages to propagate, then collect
# the result in aggregates
static:
doAssert aggregateSlotOffset == syncContributionSlotOffset, "Timing change?"
let
target = node.dag.getBlockRef(attestation.data.target.root).valueOr:
notice "Attempt to send attestation for unknown target",
attestation = shortLog(attestation)
return SendResult.err(
"Attempt to send attestation for unknown block")
aggregateCutoff = node.beaconClock.fromNow(slot.aggregate_deadline())
if aggregateCutoff.inFuture:
debug "Waiting to send aggregate attestations",
aggregateCutoff = shortLog(aggregateCutoff.offset)
await sleepAsync(aggregateCutoff.offset)
epochRef = node.dag.getEpochRef(
target, attestation.data.target.epoch, false).valueOr:
warn "Cannot construct EpochRef for attestation, skipping send - report bug",
target = shortLog(target),
attestation = shortLog(attestation)
return
committee_index =
epochRef.get_committee_index(attestation.data.index).valueOr:
notice "Invalid committee index in attestation",
attestation = shortLog(attestation)
return SendResult.err("Invalid committee index in attestation")
subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), attestation.data.slot,
committee_index)
res = await node.sendAttestation(attestation, subnet_id,
checkSignature = true)
if not res.isOk():
return res
let sendAggregatedAttestationsFut =
sendAggregatedAttestations(node, head, slot)
let
wallTime = node.processor.getCurrentBeaconTime()
delay = wallTime - attestation.data.slot.attestation_deadline()
let handleSyncCommitteeContributionsFut =
handleSyncCommitteeContributions(node, head, slot)
notice "Attestation sent",
attestation = shortLog(attestation), delay, subnet_id
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
return SendResult.ok()
proc sendAggregateAndProof*(node: BeaconNode,
proof: SignedAggregateAndProof): Future[SendResult] {.
async.} =
# REST helper procedure.
let res =
await node.processor.aggregateValidator(MsgSource.api, proof)
return
if res.isGoodForSending:
let sendResult = await node.network.broadcastAggregateAndProof(proof)
if sendResult.isOk:
notice "Aggregated attestation sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
signature = shortLog(proof.signature)
sendResult
else:
notice "Aggregated attestation failed validation",
proof = shortLog(proof.message.aggregate), error = res.error()
err(res.error()[1])
proc sendVoluntaryExit*(
node: BeaconNode, exit: SignedVoluntaryExit):
Future[SendResult] {.async.} =
# REST helper procedure.
let res =
node.processor[].voluntaryExitValidator(MsgSource.api, exit)
if res.isGoodForSending:
return await node.network.broadcastVoluntaryExit(exit)
else:
notice "Voluntary exit request failed validation",
exit = shortLog(exit.message), error = res.error()
return err(res.error()[1])
proc sendAttesterSlashing*(
node: BeaconNode, slashing: AttesterSlashing): Future[SendResult] {.async.} =
# REST helper procedure.
let res =
node.processor[].attesterSlashingValidator(MsgSource.api, slashing)
if res.isGoodForSending:
return await node.network.broadcastAttesterSlashing(slashing)
else:
notice "Attester slashing request failed validation",
slashing = shortLog(slashing), error = res.error()
return err(res.error()[1])
proc sendProposerSlashing*(
node: BeaconNode, slashing: ProposerSlashing): Future[SendResult]
{.async.} =
# REST helper procedure.
let res =
node.processor[].proposerSlashingValidator(MsgSource.api, slashing)
if res.isGoodForSending:
return await node.network.broadcastProposerSlashing(slashing)
else:
notice "Proposer slashing request failed validation",
slashing = shortLog(slashing), error = res.error()
return err(res.error()[1])
proc sendBeaconBlock*(node: BeaconNode, forked: ForkedSignedBeaconBlock
): Future[SendBlockResult] {.async.} =
# REST helper procedure.
block:
# Start with a quick gossip validation check such that broadcasting the
# block doesn't get the node into trouble
let res = withBlck(forked):
validateBeaconBlock(node.dag, node.quarantine, blck,
node.beaconClock.now(), {})
if not res.isGoodForSending():
return SendBlockResult.err(res.error()[1])
# The block passed basic gossip validation - we can "safely" broadcast it now.
# In fact, per the spec, we should broadcast it even if it later fails to
# apply to our state.
let sendResult = await node.network.broadcastBeaconBlock(forked)
if sendResult.isErr:
return SendBlockResult.err(sendResult.error())
let
wallTime = node.beaconClock.now()
accepted = withBlck(forked):
let newBlockRef = await node.blockProcessor.storeBlock(
MsgSource.api, wallTime, blck, payloadValid = true)
# The boolean we return tells the caller whether the block was integrated
# into the chain
if newBlockRef.isOk():
notice "Block published",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature)
true
else:
warn "Unable to add proposed block to block pool",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), err = newBlockRef.error()
false
return SendBlockResult.ok(accepted)
proc registerDuty*(
node: BeaconNode, slot: Slot, subnet_id: SubnetId, vidx: ValidatorIndex,
isAggregator: bool) =
# Only register relevant duties
node.actionTracker.registerDuty(slot, subnet_id, vidx, isAggregator)
await handleSyncCommitteeContributionsFut
await sendAggregatedAttestationsFut
proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
## Register upcoming duties of attached validators with the duty tracker
@ -1554,13 +1099,10 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
# be getting the duties one slot at a time
for slot in wallSlot ..< wallSlot + SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS:
let
epochRef = block:
let tmp = node.dag.getEpochRef(head, slot.epoch, false)
if tmp.isErr(): # Shouldn't happen
warn "Cannot construct EpochRef for duties - report bug",
head = shortLog(head), slot
return
tmp.get()
epochRef = node.dag.getEpochRef(head, slot.epoch, false).valueOr:
warn "Cannot construct EpochRef for duties - report bug",
head = shortLog(head), slot
return
let
fork = node.dag.forkAtEpoch(slot.epoch)
committees_per_slot = get_committee_count_per_slot(epochRef)
@ -1583,4 +1125,5 @@ proc registerDuties*(node: BeaconNode, wallSlot: Slot) {.async.} =
continue
let isAggregator = is_aggregator(committee.lenu64, slotSigRes.get())
node.registerDuty(slot, subnet_id, validator_index, isAggregator)
node.actionTracker.registerDuty(
slot, subnet_id, validator_index, isAggregator)

View File

@ -126,8 +126,6 @@ type
AttestationSentMessage = object of LogMessage
attestation: AttestationObject
indexInCommittee: uint64
validator: string
AttestationReceivedMessage = object of LogMessage
attestation: AttestationObject
@ -137,9 +135,7 @@ type
AggregatedAttestationSentMessage = object of LogMessage
attestation: AttestationObject
validator: string
signature: string
aggregationSlot: uint64
AggregatedAttestationReceivedMessage = object of LogMessage
aggregate: AttestationObject
@ -155,21 +151,16 @@ type
ContributionObject = object
slot: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
subnetId: uint64
subcommittee_index: uint64
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
ContributionMessageObject = object
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
contribution: ContributionObject
selectionProof {.serializedFieldName: "selection_proof".}: string
ContributionSentObject = object
message: ContributionMessageObject
contribution: ContributionObject
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
signature: string
SCMSentMessage = object of LogMessage
message: SyncCommitteeMessageObject
validator: string
SCMReceivedMessage = object of LogMessage
wallSlot: uint64

View File

@ -90,19 +90,20 @@ suite "Attestation pool processing" & preset():
check:
# Added attestation, should get it back
toSeq(pool[].attestations(none(Slot), none(CommitteeIndex))) ==
toSeq(pool[].attestations(Opt.none(Slot), Opt.none(CommitteeIndex))) ==
@[attestation]
toSeq(pool[].attestations(
some(attestation.data.slot), none(CommitteeIndex))) == @[attestation]
toSeq(pool[].attestations(
some(attestation.data.slot), some(attestation.data.index.CommitteeIndex))) ==
Opt.some(attestation.data.slot), Opt.none(CommitteeIndex))) ==
@[attestation]
toSeq(pool[].attestations(none(Slot), some(attestation.data.index.CommitteeIndex))) ==
@[attestation]
toSeq(pool[].attestations(some(
attestation.data.slot + 1), none(CommitteeIndex))) == []
toSeq(pool[].attestations(
none(Slot), some(CommitteeIndex(attestation.data.index + 1)))) == []
Opt.some(attestation.data.slot), Opt.some(attestation.data.index.CommitteeIndex))) ==
@[attestation]
toSeq(pool[].attestations(Opt.none(Slot), Opt.some(attestation.data.index.CommitteeIndex))) ==
@[attestation]
toSeq(pool[].attestations(Opt.some(
attestation.data.slot + 1), Opt.none(CommitteeIndex))) == []
toSeq(pool[].attestations(
Opt.none(Slot), Opt.some(CommitteeIndex(attestation.data.index + 1)))) == []
process_slots(
defaultRuntimeConfig, state[],

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 5d7024f2e03b465eb5bdc8bfb1d45af44b7a6a3b
Subproject commit e6440c43c211d930d72d6189d4bca67fef7276b3

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit f75c0a273aa34880ae7ac99e7e0c5d16b26ef166
Subproject commit 4cab7b08793d25c311efe88d54f948815643bc41