VC/NIMBUS-BN validators protection. (#6329)

* Beacon node side implementation.

* Validator client side implementation.

* Address review comments and fix the test.

* Only 400 errors could be IndexedErrorMessage, 500 errors are always ErrorMessage.

* Remove VC shutdown functionality.

* Remove magic constants.

* Make arguments more visible and disable default values.

* Address review comments.
This commit is contained in:
Eugene Kabanov 2024-06-11 22:38:16 +03:00 committed by GitHub
parent 741075aada
commit 27664291c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 126 additions and 55 deletions

View File

@ -122,7 +122,7 @@ type
# ----------------------------------------------------------------
dag*: ChainDAGRef
attestationPool*: ref AttestationPool
validatorPool: ref ValidatorPool
validatorPool*: ref ValidatorPool
syncCommitteeMsgPool: ref SyncCommitteeMsgPool
lightClientPool: ref LightClientPool
@ -366,7 +366,8 @@ proc checkForPotentialDoppelganger(
proc processAttestation*(
self: ref Eth2Processor, src: MsgSource,
attestation: phase0.Attestation | electra.Attestation, subnet_id: SubnetId,
checkSignature: bool = true): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
checkSignature, checkValidator: bool
): Future[ValidationRes] {.async: (raises: [CancelledError]).} =
var wallTime = self.getCurrentBeaconTime()
let (afterGenesis, wallSlot) = wallTime.toSlot()
@ -393,19 +394,26 @@ proc processAttestation*(
let (attester_index, sig) = v.get()
self[].checkForPotentialDoppelganger(attestation, [attester_index])
if checkValidator and (attester_index in self.validatorPool[]):
warn "A validator client has attempted to send an attestation from " &
"validator that is also managed by the beacon node",
validator_index = attester_index
errReject("An attestation could not be sent from a validator that is " &
"also managed by the beacon node")
else:
self[].checkForPotentialDoppelganger(attestation, [attester_index])
trace "Attestation validated"
self.attestationPool[].addAttestation(
attestation, [attester_index], sig, wallTime)
trace "Attestation validated"
self.attestationPool[].addAttestation(
attestation, [attester_index], sig, wallTime)
self.validatorMonitor[].registerAttestation(
src, wallTime, attestation, attester_index)
self.validatorMonitor[].registerAttestation(
src, wallTime, attestation, attester_index)
beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())
beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())
ok()
ok()
else:
debug "Dropping attestation", reason = $v.error
beacon_attestations_dropped.inc(1, [$v.error[0]])

View File

@ -1771,7 +1771,8 @@ proc installMessageValidators(node: BeaconNode) =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processAttestation(
MsgSource.gossip, attestation, subnet_id)))
MsgSource.gossip, attestation, subnet_id,
checkSignature = true, checkValidator = false)))
else:
for it in SubnetId:
closureScope: # Needed for inner `proc`; don't lift it out of loop.
@ -1782,7 +1783,8 @@ proc installMessageValidators(node: BeaconNode) =
): Future[ValidationResult] {.async: (raises: [CancelledError]).} =
return toValidationResult(
await node.processor.processAttestation(
MsgSource.gossip, attestation, subnet_id)))
MsgSource.gossip, attestation, subnet_id,
checkSignature = true, checkValidator = false)))
# beacon_aggregate_and_proof
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/phase0/p2p-interface.md#beacon_aggregate_and_proof

View File

@ -911,10 +911,12 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
when consensusFork >= ConsensusFork.Deneb:
await node.router.routeSignedBeaconBlock(
forkyBlck, Opt.some(
forkyBlck.create_blob_sidecars(kzg_proofs, blobs)))
forkyBlck.create_blob_sidecars(kzg_proofs, blobs)),
checkValidator = true)
else:
await node.router.routeSignedBeaconBlock(
forkyBlck, Opt.none(seq[BlobSidecar]))
forkyBlck, Opt.none(seq[BlobSidecar]),
checkValidator = true)
if res.isErr():
return RestApiResponse.jsonError(
@ -966,10 +968,12 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
when consensusFork >= ConsensusFork.Deneb:
await node.router.routeSignedBeaconBlock(
forkyBlck, Opt.some(
forkyBlck.create_blob_sidecars(kzg_proofs, blobs)))
forkyBlck.create_blob_sidecars(kzg_proofs, blobs)),
checkValidator = true)
else:
await node.router.routeSignedBeaconBlock(
forkyBlck, Opt.none(seq[BlobSidecar]))
forkyBlck, Opt.none(seq[BlobSidecar]),
checkValidator = true)
if res.isErr():
return RestApiResponse.jsonError(
@ -1087,7 +1091,8 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
let res = withBlck(forked):
forkyBlck.root = hash_tree_root(forkyBlck.message)
await node.router.routeSignedBeaconBlock(
forkyBlck, Opt.none(seq[BlobSidecar]))
forkyBlck, Opt.none(seq[BlobSidecar]),
checkValidator = true)
if res.isErr():
return RestApiResponse.jsonError(

View File

@ -720,16 +720,30 @@ template firstSuccessSequential*(
break
proc getErrorMessage*(response: RestPlainResponse): string =
let res = decodeBytes(RestErrorMessage, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
if errorObj.stacktraces.isSome():
errorObj.message & ": [" & errorObj.stacktraces.get().join("; ") & "]"
else:
errorObj.message
let res =
decodeBytes(RestErrorMessage, response.data, response.contentType).valueOr:
return "Unable to decode error response: [" & $error & "]"
if res.stacktraces.isSome():
res.message & ": [" & res.stacktraces.get().join("; ") & "]"
else:
"Unable to decode error response: [" & $res.error & "]"
res.message
proc unpackErrorMessage*(response: RestPlainResponse): RestIndexedErrorMessage =
decodeBytes(RestIndexedErrorMessage, response.data,
response.contentType).valueOr:
let message = "Unable to decode error response: [" & $error & "]"
return RestIndexedErrorMessage(
code: -1,
message: message,
failures: default(seq[RestIndexedErrorMessageItem]))
proc getErrorMessage*(msg: RestIndexedErrorMessage): string =
if len(msg.failures) > 0:
msg.message & ": [" &
msg.failures.mapIt($it.index & ":" & it.message).join("; ") & "]"
else:
msg.message
template handleCommunicationError(): untyped {.dirty.} =
let failure = ApiNodeFailure.init(ApiFailure.Communication, RequestName,
@ -761,6 +775,13 @@ template handle400(): untyped {.dirty.} =
node.updateStatus(RestBeaconNodeStatus.Incompatible, failure)
failures.add(failure)
template handle400Indexed(): untyped {.dirty.} =
let failure = ApiNodeFailure.init(ApiFailure.Invalid, RequestName,
strategy, node, response.status,
response.unpackErrorMessage().getErrorMessage())
node.updateStatus(RestBeaconNodeStatus.Incompatible, failure)
failures.add(failure)
template handle404(): untyped {.dirty.} =
let failure = ApiNodeFailure.init(ApiFailure.NotFound, RequestName,
strategy, node, response.status, response.getErrorMessage())
@ -1522,7 +1543,7 @@ proc submitPoolAttestations*(
of 200:
ApiResponse[bool].ok(true)
of 400:
handle400()
handle400Indexed()
ApiResponse[bool].err(ResponseInvalidError)
of 500:
handle500()
@ -1550,7 +1571,7 @@ proc submitPoolAttestations*(
of 200:
return true
of 400:
handle400()
handle400Indexed()
false
of 500:
handle500()
@ -1597,7 +1618,7 @@ proc submitPoolSyncCommitteeSignature*(
of 200:
ApiResponse[bool].ok(true)
of 400:
handle400()
handle400Indexed()
ApiResponse[bool].err(ResponseInvalidError)
of 500:
handle500()
@ -1626,7 +1647,7 @@ proc submitPoolSyncCommitteeSignature*(
of 200:
return true
of 400:
handle400()
handle400Indexed()
false
of 500:
handle500()
@ -2284,7 +2305,7 @@ proc publishBlock*(
return true
of 202:
debug BlockBroadcasted, node = node,
blck = shortLog(ForkedSignedBeaconBlock.init(data))
blck = shortLog(ForkedSignedBeaconBlock.init(data))
return true
of 400:
handle400()

View File

@ -87,9 +87,9 @@ proc pollForValidatorIndices*(service: DutiesServiceRef) {.async.} =
if validator.isNone():
missing.add(validatorLog(item.validator.pubkey, item.index))
else:
validator.get().updateValidator(Opt.some ValidatorAndIndex(
index: item.index,
validator: item.validator))
vc.attachedValidators[].updateValidator(validator.get(),
Opt.some ValidatorAndIndex(index: item.index,
validator: item.validator))
updated.add(validatorLog(item.validator.pubkey, item.index))
list.add(validator.get())

View File

@ -152,7 +152,7 @@ proc addValidatorsFromWeb3Signer(
gasLimit = node.consensusManager[].getGasLimit(keystore.pubkey)
v = node.attachedValidators[].addValidator(keystore, feeRecipient,
gasLimit)
v.updateValidator(data)
node.attachedValidators[].updateValidator(v, data)
proc addValidators*(node: BeaconNode) {.async: (raises: [CancelledError]).} =
info "Loading validators", validatorsDir = node.config.validatorsDir(),
@ -174,7 +174,7 @@ proc addValidators*(node: BeaconNode) {.async: (raises: [CancelledError]).} =
v = node.attachedValidators[].addValidator(keystore, feeRecipient,
gasLimit)
v.updateValidator(data)
node.attachedValidators[].updateValidator(v, data)
# We use `allFutures` because all failures are already reported as
# user-visible warnings in `queryValidatorsSource`.
@ -363,10 +363,12 @@ proc createAndSendAttestation(node: BeaconNode,
res =
if consensusFork >= ConsensusFork.Electra:
await node.router.routeAttestation(
registered.toElectraAttestation(signature), subnet_id, checkSignature = false)
registered.toElectraAttestation(signature), subnet_id,
checkSignature = false, checkValidator = false)
else:
await node.router.routeAttestation(
registered.toAttestation(signature), subnet_id, checkSignature = false)
registered.toAttestation(signature), subnet_id,
checkSignature = false, checkValidator = false)
if not res.isOk():
return
@ -1294,7 +1296,8 @@ proc proposeBlockAux(
else:
Opt.none(seq[BlobSidecar])
newBlockRef = (
await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt)
await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt,
checkValidator = false)
).valueOr:
return head # Errors logged in router
@ -1871,7 +1874,7 @@ proc updateValidators(
let
v = node.attachedValidators[].getValidator(validators[i].pubkey).valueOr:
continue
v.index = Opt.some ValidatorIndex(i)
node.attachedValidators[].setValidatorIndex(v, ValidatorIndex(i))
node.dutyValidatorCount = validators.len
@ -1881,10 +1884,12 @@ proc updateValidators(
# Activation epoch can change after index is assigned..
let index = validator.index.get()
if index < validators.lenu64:
validator.updateValidator(
node.attachedValidators[].updateValidator(
validator,
Opt.some(ValidatorAndIndex(
index: index, validator: validators[int index]
)))
))
)
proc handleFallbackAttestations(node: BeaconNode, lastSlot, slot: Slot) =
# Neither block proposal nor sync committee duties can be done in this

View File

@ -1607,7 +1607,7 @@ proc addValidator*(
if not isNil(host.getValidatorAndIdxFn):
let data = host.getValidatorAndIdxFn(keystore.pubkey)
v.updateValidator(data)
host.validatorPool[].updateValidator(v, data)
proc generateDeposits*(cfg: RuntimeConfig,
rng: var HmacDrbgContext,

View File

@ -84,13 +84,22 @@ template getCurrentBeaconTime(router: MessageRouter): BeaconTime =
type RouteBlockResult = Result[Opt[BlockRef], string]
proc routeSignedBeaconBlock*(
router: ref MessageRouter, blck: ForkySignedBeaconBlock,
blobsOpt: Opt[seq[BlobSidecar]]):
blobsOpt: Opt[seq[BlobSidecar]], checkValidator: bool):
Future[RouteBlockResult] {.async: (raises: [CancelledError]).} =
## Validate and broadcast beacon block, then add it to the block database
## Returns the new Head when block is added successfully to dag, none when
## block passes validation but is not added, and error otherwise
let wallTime = router[].getCurrentBeaconTime()
block:
let vindex = ValidatorIndex(blck.message.proposer_index)
if checkValidator and (vindex in router.processor.validatorPool[]):
warn "A validator client attempts to send a block from " &
"validator that is also manager by beacon node",
validator_index = vindex
return err("Block could not be sent from validator that is also " &
"managed by the beacon node")
# Start with a quick gossip validation check such that broadcasting the
# block doesn't get the node into trouble
block:
@ -192,13 +201,14 @@ proc routeSignedBeaconBlock*(
proc routeAttestation*(
router: ref MessageRouter,
attestation: phase0.Attestation | electra.Attestation,
subnet_id: SubnetId, checkSignature: bool):
subnet_id: SubnetId, checkSignature, checkValidator: bool):
Future[SendResult] {.async: (raises: [CancelledError]).} =
## Process and broadcast attestation - processing will register the it with
## the attestation pool
block:
let res = await router[].processor.processAttestation(
MsgSource.api, attestation, subnet_id, checkSignature)
MsgSource.api, attestation, subnet_id,
checkSignature = checkSignature, checkValidator = checkValidator)
if not res.isGoodForSending:
warn "Attestation failed validation",
@ -249,7 +259,7 @@ proc routeAttestation*(
committee_index)
return await router.routeAttestation(
attestation, subnet_id, checkSignature = true)
attestation, subnet_id, checkSignature = true, checkValidator = true)
proc routeSignedAggregateAndProof*(
router: ref MessageRouter, proof: phase0.SignedAggregateAndProof,

View File

@ -143,7 +143,8 @@ proc unblindAndRouteBlockMEV*(
blck = shortLog(signedBlock)
let newBlockRef =
(await node.router.routeSignedBeaconBlock(signedBlock, blobsOpt)).valueOr:
(await node.router.routeSignedBeaconBlock(
signedBlock, blobsOpt, checkValidator = false)).valueOr:
# submitBlindedBlock has run, so don't allow fallback to run
return err("routeSignedBeaconBlock error") # Errors logged in router

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import
std/[tables, json, streams, sequtils, uri],
std/[tables, json, streams, sequtils, uri, sets],
chronos, chronicles, metrics,
json_serialization/std/net,
presto/client,
@ -93,6 +93,7 @@ type
ValidatorPool* = object
validators*: Table[ValidatorPubKey, AttachedValidator]
indexSet*: HashSet[ValidatorIndex]
slashingProtection*: SlashingProtectionDB
doppelgangerDetectionEnabled*: bool
@ -223,10 +224,24 @@ func contains*(pool: ValidatorPool, pubkey: ValidatorPubKey): bool =
## Returns ``true`` if validator with key ``pubkey`` present in ``pool``.
pool.validators.contains(pubkey)
proc contains*(pool: ValidatorPool, index: ValidatorIndex): bool =
## Returns ``true`` if validator with index ``index`` present in ``pool``.
pool.indexSet.contains(index)
proc setValidatorIndex*(pool: var ValidatorPool, validator: AttachedValidator,
index: ValidatorIndex) =
pool.indexSet.incl(index)
validator.index = Opt.some(index)
proc removeValidatorIndex(pool: var ValidatorPool, index: ValidatorIndex) =
pool.indexSet.excl(index)
proc removeValidator*(pool: var ValidatorPool, pubkey: ValidatorPubKey) =
## Delete validator with public key ``pubkey`` from ``pool``.
let validator = pool.validators.getOrDefault(pubkey)
if not(isNil(validator)):
if validator.index.isSome():
pool.removeValidatorIndex(validator.index.get)
pool.validators.del(pubkey)
case validator.kind
of ValidatorKind.Local:
@ -243,8 +258,9 @@ proc removeValidator*(pool: var ValidatorPool, pubkey: ValidatorPubKey) =
func needsUpdate*(validator: AttachedValidator): bool =
validator.index.isNone() or validator.activationEpoch == FAR_FUTURE_EPOCH
proc updateValidator*(
validator: AttachedValidator, validatorData: Opt[ValidatorAndIndex]) =
proc updateValidator*(pool: var ValidatorPool,
validator: AttachedValidator,
validatorData: Opt[ValidatorAndIndex]) =
defer: validator.updated = true
let
@ -259,6 +275,7 @@ proc updateValidator*(
## Update activation information for a validator
if validator.index != Opt.some data.index:
pool.setValidatorIndex(validator, data.index)
validator.index = Opt.some data.index
validator.validator = Opt.some data.validator

View File

@ -71,13 +71,14 @@ func checkResponse(a, b: openArray[KeystoreData]): bool =
suite "Validator pool":
test "Doppelganger for genesis validator":
let
pool = newClone(ValidatorPool())
v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH)
check:
not v.triggersDoppelganger(GENESIS_EPOCH) # no check
not v.doppelgangerReady(GENESIS_EPOCH.start_slot) # no activation
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(1), GENESIS_EPOCH))
pool[].updateValidator(v, makeValidatorAndIndex(ValidatorIndex(1), GENESIS_EPOCH))
check:
not v.triggersDoppelganger(GENESIS_EPOCH) # no check
@ -94,6 +95,7 @@ suite "Validator pool":
test "Doppelganger for validator that activates in same epoch as check":
let
pool = newClone(ValidatorPool())
v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH)
now = Epoch(10).start_slot()
@ -104,7 +106,7 @@ suite "Validator pool":
not v.doppelgangerReady(GENESIS_EPOCH.start_slot)
not v.doppelgangerReady(now)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), FAR_FUTURE_EPOCH))
pool[].updateValidator(v, makeValidatorAndIndex(ValidatorIndex(5), FAR_FUTURE_EPOCH))
check: # We still don't know when validator activates so we wouldn't trigger
not v.triggersDoppelganger(GENESIS_EPOCH)
@ -113,7 +115,7 @@ suite "Validator pool":
not v.doppelgangerReady(GENESIS_EPOCH.start_slot)
not v.doppelgangerReady(now)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch()))
pool[].updateValidator(v, makeValidatorAndIndex(ValidatorIndex(5), now.epoch()))
check: # No check done yet
not v.triggersDoppelganger(GENESIS_EPOCH)