diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 0fc759a3d..216c644b9 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -122,7 +122,7 @@ type # ---------------------------------------------------------------- dag*: ChainDAGRef attestationPool*: ref AttestationPool - validatorPool: ref ValidatorPool + validatorPool*: ref ValidatorPool syncCommitteeMsgPool: ref SyncCommitteeMsgPool lightClientPool: ref LightClientPool @@ -366,7 +366,8 @@ proc checkForPotentialDoppelganger( proc processAttestation*( self: ref Eth2Processor, src: MsgSource, attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId, - checkSignature: bool = true): Future[ValidationRes] {.async: (raises: [CancelledError]).} = + checkSignature, checkValidator: bool +): Future[ValidationRes] {.async: (raises: [CancelledError]).} = var wallTime = self.getCurrentBeaconTime() let (afterGenesis, wallSlot) = wallTime.toSlot() @@ -393,19 +394,26 @@ proc processAttestation*( let (attester_index, sig) = v.get() - self[].checkForPotentialDoppelganger(attestation, [attester_index]) + if checkValidator and (attester_index in self.validatorPool[]): + warn "A validator client has attempted to send an attestation from " & + "validator that is also managed by the beacon node", + validator_index = attester_index + errReject("An attestation could not be sent from a validator that is " & + "also managed by the beacon node") + else: + self[].checkForPotentialDoppelganger(attestation, [attester_index]) - trace "Attestation validated" - self.attestationPool[].addAttestation( - attestation, [attester_index], sig, wallTime) + trace "Attestation validated" + self.attestationPool[].addAttestation( + attestation, [attester_index], sig, wallTime) - self.validatorMonitor[].registerAttestation( - src, wallTime, attestation, attester_index) + self.validatorMonitor[].registerAttestation( + src, wallTime, attestation, attester_index) - beacon_attestations_received.inc() - beacon_attestation_delay.observe(delay.toFloatSeconds()) + beacon_attestations_received.inc() + beacon_attestation_delay.observe(delay.toFloatSeconds()) - ok() + ok() else: debug "Dropping attestation", reason = $v.error beacon_attestations_dropped.inc(1, [$v.error[0]]) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 185ff1b71..0079690dc 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -1771,7 +1771,8 @@ proc installMessageValidators(node: BeaconNode) = ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( - MsgSource.gossip, attestation, subnet_id))) + MsgSource.gossip, attestation, subnet_id, + checkSignature = true, checkValidator = false))) else: for it in SubnetId: closureScope: # Needed for inner `proc`; don't lift it out of loop. @@ -1782,7 +1783,8 @@ proc installMessageValidators(node: BeaconNode) = ): Future[ValidationResult] {.async: (raises: [CancelledError]).} = return toValidationResult( await node.processor.processAttestation( - MsgSource.gossip, attestation, subnet_id))) + MsgSource.gossip, attestation, subnet_id, + checkSignature = true, checkValidator = false))) # beacon_aggregate_and_proof # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof diff --git a/beacon_chain/rpc/rest_beacon_api.nim b/beacon_chain/rpc/rest_beacon_api.nim index a43a2db84..58aa5a881 100644 --- a/beacon_chain/rpc/rest_beacon_api.nim +++ b/beacon_chain/rpc/rest_beacon_api.nim @@ -911,10 +911,12 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = when consensusFork >= ConsensusFork.Deneb: await node.router.routeSignedBeaconBlock( forkyBlck, Opt.some( - forkyBlck.create_blob_sidecars(kzg_proofs, blobs))) + forkyBlck.create_blob_sidecars(kzg_proofs, blobs)), + checkValidator = true) else: await node.router.routeSignedBeaconBlock( - forkyBlck, Opt.none(seq[BlobSidecar])) + forkyBlck, Opt.none(seq[BlobSidecar]), + checkValidator = true) if res.isErr(): return RestApiResponse.jsonError( @@ -966,10 +968,12 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = when consensusFork >= ConsensusFork.Deneb: await node.router.routeSignedBeaconBlock( forkyBlck, Opt.some( - forkyBlck.create_blob_sidecars(kzg_proofs, blobs))) + forkyBlck.create_blob_sidecars(kzg_proofs, blobs)), + checkValidator = true) else: await node.router.routeSignedBeaconBlock( - forkyBlck, Opt.none(seq[BlobSidecar])) + forkyBlck, Opt.none(seq[BlobSidecar]), + checkValidator = true) if res.isErr(): return RestApiResponse.jsonError( @@ -1087,7 +1091,8 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = let res = withBlck(forked): forkyBlck.root = hash_tree_root(forkyBlck.message) await node.router.routeSignedBeaconBlock( - forkyBlck, Opt.none(seq[BlobSidecar])) + forkyBlck, Opt.none(seq[BlobSidecar]), + checkValidator = true) if res.isErr(): return RestApiResponse.jsonError( diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 1e9b39316..d02254aae 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -720,16 +720,30 @@ template firstSuccessSequential*( break proc getErrorMessage*(response: RestPlainResponse): string = - let res = decodeBytes(RestErrorMessage, response.data, - response.contentType) - if res.isOk(): - let errorObj = res.get() - if errorObj.stacktraces.isSome(): - errorObj.message & ": [" & errorObj.stacktraces.get().join("; ") & "]" - else: - errorObj.message + let res = + decodeBytes(RestErrorMessage, response.data, response.contentType).valueOr: + return "Unable to decode error response: [" & $error & "]" + + if res.stacktraces.isSome(): + res.message & ": [" & res.stacktraces.get().join("; ") & "]" else: - "Unable to decode error response: [" & $res.error & "]" + res.message + +proc unpackErrorMessage*(response: RestPlainResponse): RestIndexedErrorMessage = + decodeBytes(RestIndexedErrorMessage, response.data, + response.contentType).valueOr: + let message = "Unable to decode error response: [" & $error & "]" + return RestIndexedErrorMessage( + code: -1, + message: message, + failures: default(seq[RestIndexedErrorMessageItem])) + +proc getErrorMessage*(msg: RestIndexedErrorMessage): string = + if len(msg.failures) > 0: + msg.message & ": [" & + msg.failures.mapIt($it.index & ":" & it.message).join("; ") & "]" + else: + msg.message template handleCommunicationError(): untyped {.dirty.} = let failure = ApiNodeFailure.init(ApiFailure.Communication, RequestName, @@ -761,6 +775,13 @@ template handle400(): untyped {.dirty.} = node.updateStatus(RestBeaconNodeStatus.Incompatible, failure) failures.add(failure) +template handle400Indexed(): untyped {.dirty.} = + let failure = ApiNodeFailure.init(ApiFailure.Invalid, RequestName, + strategy, node, response.status, + response.unpackErrorMessage().getErrorMessage()) + node.updateStatus(RestBeaconNodeStatus.Incompatible, failure) + failures.add(failure) + template handle404(): untyped {.dirty.} = let failure = ApiNodeFailure.init(ApiFailure.NotFound, RequestName, strategy, node, response.status, response.getErrorMessage()) @@ -1522,7 +1543,7 @@ proc submitPoolAttestations*( of 200: ApiResponse[bool].ok(true) of 400: - handle400() + handle400Indexed() ApiResponse[bool].err(ResponseInvalidError) of 500: handle500() @@ -1550,7 +1571,7 @@ proc submitPoolAttestations*( of 200: return true of 400: - handle400() + handle400Indexed() false of 500: handle500() @@ -1597,7 +1618,7 @@ proc submitPoolSyncCommitteeSignature*( of 200: ApiResponse[bool].ok(true) of 400: - handle400() + handle400Indexed() ApiResponse[bool].err(ResponseInvalidError) of 500: handle500() @@ -1626,7 +1647,7 @@ proc submitPoolSyncCommitteeSignature*( of 200: return true of 400: - handle400() + handle400Indexed() false of 500: handle500() @@ -2284,7 +2305,7 @@ proc publishBlock*( return true of 202: debug BlockBroadcasted, node = node, - blck = shortLog(ForkedSignedBeaconBlock.init(data)) + blck = shortLog(ForkedSignedBeaconBlock.init(data)) return true of 400: handle400() diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 1214302a3..d05b7da28 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -87,9 +87,9 @@ proc pollForValidatorIndices*(service: DutiesServiceRef) {.async.} = if validator.isNone(): missing.add(validatorLog(item.validator.pubkey, item.index)) else: - validator.get().updateValidator(Opt.some ValidatorAndIndex( - index: item.index, - validator: item.validator)) + vc.attachedValidators[].updateValidator(validator.get(), + Opt.some ValidatorAndIndex(index: item.index, + validator: item.validator)) updated.add(validatorLog(item.validator.pubkey, item.index)) list.add(validator.get()) diff --git a/beacon_chain/validators/beacon_validators.nim b/beacon_chain/validators/beacon_validators.nim index 9dfb19f6d..328075535 100644 --- a/beacon_chain/validators/beacon_validators.nim +++ b/beacon_chain/validators/beacon_validators.nim @@ -152,7 +152,7 @@ proc addValidatorsFromWeb3Signer( gasLimit = node.consensusManager[].getGasLimit(keystore.pubkey) v = node.attachedValidators[].addValidator(keystore, feeRecipient, gasLimit) - v.updateValidator(data) + node.attachedValidators[].updateValidator(v, data) proc addValidators*(node: BeaconNode) {.async: (raises: [CancelledError]).} = info "Loading validators", validatorsDir = node.config.validatorsDir(), @@ -174,7 +174,7 @@ proc addValidators*(node: BeaconNode) {.async: (raises: [CancelledError]).} = v = node.attachedValidators[].addValidator(keystore, feeRecipient, gasLimit) - v.updateValidator(data) + node.attachedValidators[].updateValidator(v, data) # We use `allFutures` because all failures are already reported as # user-visible warnings in `queryValidatorsSource`. @@ -363,10 +363,12 @@ proc createAndSendAttestation(node: BeaconNode, res = if consensusFork >= ConsensusFork.Electra: await node.router.routeAttestation( - registered.toElectraAttestation(signature), subnet_id, checkSignature = false) + registered.toElectraAttestation(signature), subnet_id, + checkSignature = false, checkValidator = false) else: await node.router.routeAttestation( - registered.toAttestation(signature), subnet_id, checkSignature = false) + registered.toAttestation(signature), subnet_id, + checkSignature = false, checkValidator = false) if not res.isOk(): return @@ -1294,7 +1296,8 @@ proc proposeBlockAux( else: Opt.none(seq[BlobSidecar]) newBlockRef = ( - await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt) + await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt, + checkValidator = false) ).valueOr: return head # Errors logged in router @@ -1871,7 +1874,7 @@ proc updateValidators( let v = node.attachedValidators[].getValidator(validators[i].pubkey).valueOr: continue - v.index = Opt.some ValidatorIndex(i) + node.attachedValidators[].setValidatorIndex(v, ValidatorIndex(i)) node.dutyValidatorCount = validators.len @@ -1881,10 +1884,12 @@ proc updateValidators( # Activation epoch can change after index is assigned.. let index = validator.index.get() if index < validators.lenu64: - validator.updateValidator( + node.attachedValidators[].updateValidator( + validator, Opt.some(ValidatorAndIndex( index: index, validator: validators[int index] - ))) + )) + ) proc handleFallbackAttestations(node: BeaconNode, lastSlot, slot: Slot) = # Neither block proposal nor sync committee duties can be done in this diff --git a/beacon_chain/validators/keystore_management.nim b/beacon_chain/validators/keystore_management.nim index 9610c439d..243fed9d3 100644 --- a/beacon_chain/validators/keystore_management.nim +++ b/beacon_chain/validators/keystore_management.nim @@ -1607,7 +1607,7 @@ proc addValidator*( if not isNil(host.getValidatorAndIdxFn): let data = host.getValidatorAndIdxFn(keystore.pubkey) - v.updateValidator(data) + host.validatorPool[].updateValidator(v, data) proc generateDeposits*(cfg: RuntimeConfig, rng: var HmacDrbgContext, diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 1e40c450e..c70adb743 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -84,13 +84,22 @@ template getCurrentBeaconTime(router: MessageRouter): BeaconTime = type RouteBlockResult = Result[Opt[BlockRef], string] proc routeSignedBeaconBlock*( router: ref MessageRouter, blck: ForkySignedBeaconBlock, - blobsOpt: Opt[seq[BlobSidecar]]): + blobsOpt: Opt[seq[BlobSidecar]], checkValidator: bool): Future[RouteBlockResult] {.async: (raises: [CancelledError]).} = ## Validate and broadcast beacon block, then add it to the block database ## Returns the new Head when block is added successfully to dag, none when ## block passes validation but is not added, and error otherwise let wallTime = router[].getCurrentBeaconTime() + block: + let vindex = ValidatorIndex(blck.message.proposer_index) + if checkValidator and (vindex in router.processor.validatorPool[]): + warn "A validator client attempts to send a block from " & + "validator that is also manager by beacon node", + validator_index = vindex + return err("Block could not be sent from validator that is also " & + "managed by the beacon node") + # Start with a quick gossip validation check such that broadcasting the # block doesn't get the node into trouble block: @@ -192,13 +201,14 @@ proc routeSignedBeaconBlock*( proc routeAttestation*( router: ref MessageRouter, attestation: phase0.Attestation | electra.Attestation, - subnet_id: SubnetId, checkSignature: bool): + subnet_id: SubnetId, checkSignature, checkValidator: bool): Future[SendResult] {.async: (raises: [CancelledError]).} = ## Process and broadcast attestation - processing will register the it with ## the attestation pool block: let res = await router[].processor.processAttestation( - MsgSource.api, attestation, subnet_id, checkSignature) + MsgSource.api, attestation, subnet_id, + checkSignature = checkSignature, checkValidator = checkValidator) if not res.isGoodForSending: warn "Attestation failed validation", @@ -249,7 +259,7 @@ proc routeAttestation*( committee_index) return await router.routeAttestation( - attestation, subnet_id, checkSignature = true) + attestation, subnet_id, checkSignature = true, checkValidator = true) proc routeSignedAggregateAndProof*( router: ref MessageRouter, proof: phase0.SignedAggregateAndProof, diff --git a/beacon_chain/validators/message_router_mev.nim b/beacon_chain/validators/message_router_mev.nim index 5abdc64ef..76a2c44e3 100644 --- a/beacon_chain/validators/message_router_mev.nim +++ b/beacon_chain/validators/message_router_mev.nim @@ -143,7 +143,8 @@ proc unblindAndRouteBlockMEV*( blck = shortLog(signedBlock) let newBlockRef = - (await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt)).valueOr: + (await node.router.routeSignedBeaconBlock( + signedBlock, blobsOpt, checkValidator = false)).valueOr: # submitBlindedBlock has run, so don't allow fallback to run return err("routeSignedBeaconBlock error") # Errors logged in router diff --git a/beacon_chain/validators/validator_pool.nim b/beacon_chain/validators/validator_pool.nim index 52a4d0741..98df8a2f5 100644 --- a/beacon_chain/validators/validator_pool.nim +++ b/beacon_chain/validators/validator_pool.nim @@ -8,7 +8,7 @@ {.push raises: [].} import - std/[tables, json, streams, sequtils, uri], + std/[tables, json, streams, sequtils, uri, sets], chronos, chronicles, metrics, json_serialization/std/net, presto/client, @@ -93,6 +93,7 @@ type ValidatorPool* = object validators*: Table[ValidatorPubKey, AttachedValidator] + indexSet*: HashSet[ValidatorIndex] slashingProtection*: SlashingProtectionDB doppelgangerDetectionEnabled*: bool @@ -223,10 +224,24 @@ func contains*(pool: ValidatorPool, pubkey: ValidatorPubKey): bool = ## Returns ``true`` if validator with key ``pubkey`` present in ``pool``. pool.validators.contains(pubkey) +proc contains*(pool: ValidatorPool, index: ValidatorIndex): bool = + ## Returns ``true`` if validator with index ``index`` present in ``pool``. + pool.indexSet.contains(index) + +proc setValidatorIndex*(pool: var ValidatorPool, validator: AttachedValidator, + index: ValidatorIndex) = + pool.indexSet.incl(index) + validator.index = Opt.some(index) + +proc removeValidatorIndex(pool: var ValidatorPool, index: ValidatorIndex) = + pool.indexSet.excl(index) + proc removeValidator*(pool: var ValidatorPool, pubkey: ValidatorPubKey) = ## Delete validator with public key ``pubkey`` from ``pool``. let validator = pool.validators.getOrDefault(pubkey) if not(isNil(validator)): + if validator.index.isSome(): + pool.removeValidatorIndex(validator.index.get) pool.validators.del(pubkey) case validator.kind of ValidatorKind.Local: @@ -243,8 +258,9 @@ proc removeValidator*(pool: var ValidatorPool, pubkey: ValidatorPubKey) = func needsUpdate*(validator: AttachedValidator): bool = validator.index.isNone() or validator.activationEpoch == FAR_FUTURE_EPOCH -proc updateValidator*( - validator: AttachedValidator, validatorData: Opt[ValidatorAndIndex]) = +proc updateValidator*(pool: var ValidatorPool, + validator: AttachedValidator, + validatorData: Opt[ValidatorAndIndex]) = defer: validator.updated = true let @@ -259,6 +275,7 @@ proc updateValidator*( ## Update activation information for a validator if validator.index != Opt.some data.index: + pool.setValidatorIndex(validator, data.index) validator.index = Opt.some data.index validator.validator = Opt.some data.validator diff --git a/tests/test_validator_pool.nim b/tests/test_validator_pool.nim index a79c8dca8..da3802b91 100644 --- a/tests/test_validator_pool.nim +++ b/tests/test_validator_pool.nim @@ -71,13 +71,14 @@ func checkResponse(a, b: openArray[KeystoreData]): bool = suite "Validator pool": test "Doppelganger for genesis validator": let + pool = newClone(ValidatorPool()) v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH) check: not v.triggersDoppelganger(GENESIS_EPOCH) # no check not v.doppelgangerReady(GENESIS_EPOCH.start_slot) # no activation - v.updateValidator(makeValidatorAndIndex(ValidatorIndex(1), GENESIS_EPOCH)) + pool[].updateValidator(v, makeValidatorAndIndex(ValidatorIndex(1), GENESIS_EPOCH)) check: not v.triggersDoppelganger(GENESIS_EPOCH) # no check @@ -94,6 +95,7 @@ suite "Validator pool": test "Doppelganger for validator that activates in same epoch as check": let + pool = newClone(ValidatorPool()) v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH) now = Epoch(10).start_slot() @@ -104,7 +106,7 @@ suite "Validator pool": not v.doppelgangerReady(GENESIS_EPOCH.start_slot) not v.doppelgangerReady(now) - v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), FAR_FUTURE_EPOCH)) + pool[].updateValidator(v, makeValidatorAndIndex(ValidatorIndex(5), FAR_FUTURE_EPOCH)) check: # We still don't know when validator activates so we wouldn't trigger not v.triggersDoppelganger(GENESIS_EPOCH) @@ -113,7 +115,7 @@ suite "Validator pool": not v.doppelgangerReady(GENESIS_EPOCH.start_slot) not v.doppelgangerReady(now) - v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch())) + pool[].updateValidator(v, makeValidatorAndIndex(ValidatorIndex(5), now.epoch())) check: # No check done yet not v.triggersDoppelganger(GENESIS_EPOCH)