From c3d3397843ac8b9f93c884fd9f787eec32723bb2 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Thu, 21 Jul 2022 19:54:07 +0300 Subject: [PATCH] VC: doppelganger protection (#3877) * Improve fallback_service. * Improve logging in fallback_service. * Apply signal handling for all stages. * Fix some logging statements. * Add doppelganger REST api endpoint. Add some structures to VC. * Add client API call implementation. * Initial fix & refactor onceToAll() Add doppelganger service. Add doppelganger helpers. * Add doppelganger checks. * Move doppelganger log messages to higher levels. * Fix firstSuccess(). * Bump chronos. * Post rebase fixes. * Proper chronos bump. * Address review comments. * Attempt to fix finalization test issue. * Fix nimbus_signing_node. * Mark validators which are added at GENESIS_SLOT in GENESIS_EPOCH as passed doppelganger validation. * Do not send empty requests to server. * Fix log statement. * Address review comments and re-raise cancellations. Co-authored-by: zah --- beacon_chain/conf.nim | 11 + .../attestation_pool.nim | 8 + beacon_chain/nimbus_signing_node.nim | 4 +- beacon_chain/nimbus_validator_client.nim | 22 +- beacon_chain/rpc/rest_nimbus_api.nim | 57 ++++- .../eth2_apis/eth2_rest_serialization.nim | 46 ++++ .../spec/eth2_apis/rest_beacon_client.nim | 4 +- .../spec/eth2_apis/rest_nimbus_calls.nim | 17 ++ beacon_chain/spec/eth2_apis/rest_types.nim | 6 + beacon_chain/validator_client/api.nim | 231 ++++++++++++++++++ .../validator_client/attestation_service.nim | 19 +- .../validator_client/block_service.nim | 7 + beacon_chain/validator_client/common.nim | 138 +++++++++++ .../validator_client/doppelganger_service.nim | 117 +++++++++ .../validator_client/duties_service.nim | 3 + beacon_chain/validators/validator_duties.nim | 14 +- beacon_chain/validators/validator_pool.nim | 23 +- 17 files changed, 700 insertions(+), 27 deletions(-) create mode 100644 beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim create mode 100644 beacon_chain/validator_client/doppelganger_service.nim diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 52cde64fe..e60d4f608 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -767,6 +767,17 @@ type abbr: "d" name: "data-dir" .}: OutDir + doppelgangerDetection* {. + # TODO This description is shared between the BN and the VC. + # Extract it in a constant (confutils fix may be needed). + desc: "If enabled, the validator client prudently listens for 2 epochs " & + "for attestations from a validator with the same index " & + "(a doppelganger), before sending an attestation itself. This " & + "protects against slashing (due to double-voting) but means you " & + "will miss two attestations when restarting." + defaultValue: true + name: "doppelganger-detection" .}: bool + nonInteractive* {. desc: "Do not display interative prompts. Quit on missing configuration" name: "non-interactive" .}: bool diff --git a/beacon_chain/consensus_object_pools/attestation_pool.nim b/beacon_chain/consensus_object_pools/attestation_pool.nim index 53a04bcd2..6f21cfe6d 100644 --- a/beacon_chain/consensus_object_pools/attestation_pool.nim +++ b/beacon_chain/consensus_object_pools/attestation_pool.nim @@ -755,3 +755,11 @@ proc prune*(pool: var AttestationPool) = # If pruning fails, it's likely the result of a bug - this shouldn't happen # but we'll keep running hoping that the fork chocie will recover eventually error "Couldn't prune fork choice, bug?", err = v.error() + +proc validatorSeenAtEpoch*(pool: var AttestationPool, epoch: Epoch, + vindex: ValidatorIndex): bool = + if uint64(vindex) < lenu64(pool.nextAttestationEpoch): + let mark = pool.nextAttestationEpoch[vindex] + (mark.subnet > epoch) or (mark.aggregate > epoch) + else: + false diff --git a/beacon_chain/nimbus_signing_node.nim b/beacon_chain/nimbus_signing_node.nim index 17113e37e..3b99246a1 100644 --- a/beacon_chain/nimbus_signing_node.nim +++ b/beacon_chain/nimbus_signing_node.nim @@ -100,7 +100,9 @@ proc initValidators(sn: var SigningNode): bool = for keystore in listLoadableKeystores(sn.config): case keystore.kind of KeystoreKind.Local: - sn.attachedValidators.addLocalValidator(keystore) + # Signing node is not supposed to know genesis time, so we just set + # `start_slot` to GENESIS_SLOT. + sn.attachedValidators.addLocalValidator(keystore, GENESIS_SLOT) publicKeyIdents.add("\"0x" & keystore.pubkey.toHex() & "\"") of KeystoreKind.Remote: error "Signing node do not support remote validators", diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index 83fd30f20..01657ce12 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -6,29 +6,34 @@ # at your option. This file may not be copied, modified, or distributed except according to those terms. import validator_client/[common, fallback_service, duties_service, attestation_service, fork_service, - sync_committee_service] + sync_committee_service, doppelganger_service] proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} = info "Initializing genesis", nodes_count = len(vc.beaconNodes) var nodes = vc.beaconNodes while true: - var pending: seq[Future[RestResponse[GetGenesisResponse]]] + var pendingRequests: seq[Future[RestResponse[GetGenesisResponse]]] for node in nodes: debug "Requesting genesis information", endpoint = node - pending.add(node.client.getGenesis()) + pendingRequests.add(node.client.getGenesis()) try: - await allFutures(pending) + await allFutures(pendingRequests) except CancelledError as exc: + var pending: seq[Future[void]] debug "Genesis information request was interrupted" + for future in pendingRequests: + if not(future.finished()): + pending.add(future.cancelAndWait()) + await allFutures(pending) raise exc let (errorNodes, genesisList) = block: var gres: seq[RestGenesis] var bres: seq[BeaconNodeServerRef] - for i in 0 ..< len(pending): - let fut = pending[i] + for i in 0 ..< len(pendingRequests): + let fut = pendingRequests[i] if fut.done(): let resp = fut.read() if resp.status == 200: @@ -87,7 +92,7 @@ proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} = continue else: duplicates.add(pubkey) - vc.attachedValidators.addLocalValidator(keystore) + vc.addValidator(keystore) return true proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} = @@ -157,6 +162,7 @@ proc asyncInit(vc: ValidatorClientRef) {.async.} = vc.fallbackService = await FallbackServiceRef.init(vc) vc.forkService = await ForkServiceRef.init(vc) vc.dutiesService = await DutiesServiceRef.init(vc) + vc.doppelgangerService = await DoppelgangerServiceRef.init(vc) vc.attestationService = await AttestationServiceRef.init(vc) vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc) except CancelledError: @@ -168,6 +174,7 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} = vc.fallbackService.start() vc.forkService.start() vc.dutiesService.start() + vc.doppelgangerService.start() vc.attestationService.start() vc.syncCommitteeService.start() @@ -195,6 +202,7 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} = pending.add(vc.fallbackService.stop()) pending.add(vc.forkService.stop()) pending.add(vc.dutiesService.stop()) + pending.add(vc.doppelgangerService.stop()) pending.add(vc.attestationService.stop()) pending.add(vc.syncCommitteeService.stop()) await allFutures(pending) diff --git a/beacon_chain/rpc/rest_nimbus_api.nim b/beacon_chain/rpc/rest_nimbus_api.nim index 4f89ac5e7..592dc480d 100644 --- a/beacon_chain/rpc/rest_nimbus_api.nim +++ b/beacon_chain/rpc/rest_nimbus_api.nim @@ -13,7 +13,7 @@ import ./rest_utils, ../eth1/eth1_monitor, ../validators/validator_duties, - ../spec/forks, + ../spec/[forks, beacon_time], ../beacon_node, ../nimbus_binary_common export rest_utils @@ -263,6 +263,61 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) = return RestApiResponse.jsonError(Http503, "Compile with '-d:chronosFutureTracking' to get this request working") + router.api(MethodPost, "/nimbus/v1/validator/activity/{epoch}") do ( + epoch: Epoch, contentBody: Option[ContentBody]) -> RestApiResponse: + let indexList = + block: + if contentBody.isNone(): + return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) + let dres = decodeBody(seq[RestValidatorIndex], contentBody.get()) + if dres.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidValidatorIndexValueError, + $dres.error()) + var res: HashSet[ValidatorIndex] + let items = dres.get() + for item in items: + let vres = item.toValidatorIndex() + if vres.isErr(): + case vres.error() + of ValidatorIndexError.TooHighValue: + return RestApiResponse.jsonError(Http400, + TooHighValidatorIndexValueError) + of ValidatorIndexError.UnsupportedValue: + return RestApiResponse.jsonError(Http500, + UnsupportedValidatorIndexValueError) + res.incl(vres.get()) + if len(res) == 0: + return RestApiResponse.jsonError(Http400, + EmptyValidatorIndexArrayError) + res + let qepoch = + block: + if epoch.isErr(): + return RestApiResponse.jsonError(Http400, InvalidEpochValueError, + $epoch.error()) + let + res = epoch.get() + wallEpoch = node.currentSlot().epoch() + nextEpoch = + if wallEpoch == FAR_FUTURE_EPOCH: + wallEpoch + else: + wallEpoch + 1 + prevEpoch = get_previous_epoch(wallEpoch) + if (res < prevEpoch) or (res > nextEpoch): + return RestApiResponse.jsonError(Http400, InvalidEpochValueError, + "Requested epoch is more than one epoch from current epoch") + res + let response = indexList.mapIt( + RestActivityItem( + index: it, + epoch: qepoch, + active: node.attestationPool[].validatorSeenAtEpoch(qepoch, it) + ) + ) + return RestApiResponse.jsonResponse(response) + router.api(MethodGet, "/nimbus/v1/debug/gossip/peers") do ( ) -> RestApiResponse: diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index 195b91d13..ae219021e 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -2097,6 +2097,52 @@ proc readValue*(reader: var JsonReader[RestJson], keystores: keystores, passwords: passwords, slashing_protection: slashing ) +proc writeValue*(writer: var JsonWriter[RestJson], + value: RestActivityItem) {. + raises: [IOError, Defect].} = + writer.beginRecord() + writer.writeField("index", value.index) + writer.writeField("epoch", value.epoch) + writer.writeField("active", value.active) + writer.endRecord() + +proc readValue*(reader: var JsonReader[RestJson], + value: var RestActivityItem) {. + raises: [SerializationError, IOError, Defect].} = + var index: Option[ValidatorIndex] + var epoch: Option[Epoch] + var active: Option[bool] + + for fieldName in readObjectFields(reader): + case fieldName + of "index": + if index.isSome(): + reader.raiseUnexpectedField( + "Multiple `index` fields found", "RestActivityItem") + index = some(reader.readValue(ValidatorIndex)) + of "epoch": + if epoch.isSome(): + reader.raiseUnexpectedField( + "Multiple `epoch` fields found", "RestActivityItem") + epoch = some(reader.readValue(Epoch)) + of "active": + if active.isSome(): + reader.raiseUnexpectedField( + "Multiple `active` fields found", "RestActivityItem") + active = some(reader.readValue(bool)) + else: + discard + + if index.isNone(): + reader.raiseUnexpectedValue("Missing or empty `index` value") + if epoch.isNone(): + reader.raiseUnexpectedValue("Missing or empty `epoch` value") + if active.isNone(): + reader.raiseUnexpectedValue("Missing or empty `active` value") + + value = RestActivityItem(index: index.get(), epoch: epoch.get(), + active: active.get()) + proc dump*(value: KeystoresAndSlashingProtection): string {. raises: [IOError, Defect].} = var stream = memoryOutput() diff --git a/beacon_chain/spec/eth2_apis/rest_beacon_client.nim b/beacon_chain/spec/eth2_apis/rest_beacon_client.nim index 320da6ade..e0d6e58b5 100644 --- a/beacon_chain/spec/eth2_apis/rest_beacon_client.nim +++ b/beacon_chain/spec/eth2_apis/rest_beacon_client.nim @@ -11,11 +11,11 @@ import "."/[ rest_beacon_calls, rest_config_calls, rest_debug_calls, rest_node_calls, rest_validator_calls, rest_keymanager_calls, - rest_common + rest_nimbus_calls, rest_common ] export chronos, client, rest_beacon_calls, rest_config_calls, rest_debug_calls, rest_node_calls, rest_validator_calls, rest_keymanager_calls, - rest_common + rest_nimbus_calls, rest_common diff --git a/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim b/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim new file mode 100644 index 000000000..8f9c52b25 --- /dev/null +++ b/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim @@ -0,0 +1,17 @@ +# Copyright (c) 2018-2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import + chronos, presto/client, + "."/[rest_types, eth2_rest_serialization] + +proc getValidatorsActivity*(epoch: Epoch, + body: seq[ValidatorIndex] + ): RestPlainResponse {. + rest, endpoint: "/nimbus/v1/validator/activity/{epoch}", + meth: MethodPost.} diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index d24ad7d3c..25b6d5be7 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -234,6 +234,11 @@ type discovery_addresses*: seq[string] metadata*: RestMetadata + RestActivityItem* = object + index*: ValidatorIndex + epoch*: Epoch + active*: bool + RestPublishedSignedBeaconBlock* = distinct ForkedSignedBeaconBlock RestPublishedBeaconBlock* = distinct ForkedBeaconBlock @@ -569,6 +574,7 @@ type ProduceBlockResponseV2* = ForkedBeaconBlock ProduceSyncCommitteeContributionResponse* = DataEnclosedObject[SyncCommitteeContribution] SubmitBlindedBlockResponse* = DataEnclosedObject[bellatrix.ExecutionPayload] + GetValidatorsActivityResponse* = DataEnclosedObject[seq[RestActivityItem]] func `==`*(a, b: RestValidatorIndex): bool = uint64(a) == uint64(b) diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index ad15b85f6..426f961d2 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -10,6 +10,136 @@ type ApiOperation = enum Success, Timeout, Failure, Interrupt + ApiNodeResponse*[T] = object + node*: BeaconNodeServerRef + data*: ApiResponse[T] + + ApiResponseSeq*[T] = object + status*: ApiOperation + data*: seq[ApiNodeResponse[T]] + +template onceToAll*(vc: ValidatorClientRef, responseType: typedesc, + timeout: Duration, + body: untyped): ApiResponseSeq[responseType] = + var it {.inject.}: RestClientRef + type BodyType = typeof(body) + + var timerFut = + if timeout != InfiniteDuration: + sleepAsync(timeout) + else: + nil + + let onlineNodes = + try: + if not isNil(timerFut): + await vc.waitOnlineNodes(timerFut) + vc.onlineNodes() + except CancelledError as exc: + var default: seq[BeaconNodeServerRef] + if not(isNil(timerFut)) and not(timerFut.finished()): + await timerFut.cancelAndWait() + raise exc + except CatchableError as exc: + # This case could not be happened. + error "Unexpected exception while waiting for beacon nodes", + err_name = $exc.name, err_msg = $exc.msg + var default: seq[BeaconNodeServerRef] + default + + if len(onlineNodes) == 0: + # Timeout exceeded or operation was cancelled + ApiResponseSeq[responseType](status: ApiOperation.Timeout) + else: + let (pendingRequests, pendingNodes) = + block: + var requests: seq[BodyType] + var nodes: seq[BeaconNodeServerRef] + for node {.inject.} in onlineNodes: + it = node.client + let fut = body + requests.add(fut) + nodes.add(node) + (requests, nodes) + + let status = + try: + if isNil(timerFut): + await allFutures(pendingRequests) + ApiOperation.Success + else: + let waitFut = allFutures(pendingRequests) + discard await race(waitFut, timerFut) + if not(waitFut.finished()): + await waitFut.cancelAndWait() + ApiOperation.Timeout + else: + if not(timerFut.finished()): + await timerFut.cancelAndWait() + ApiOperation.Success + except CancelledError as exc: + # We should cancel all the pending requests and timer before we return + # result. + var pendingCancel: seq[Future[void]] + for fut in pendingRequests: + if not(fut.finished()): + pendingCancel.add(fut.cancelAndWait()) + if not(isNil(timerFut)) and not(timerFut.finished()): + pendingCancel.add(timerFut.cancelAndWait()) + await allFutures(pendingCancel) + raise exc + except CatchableError: + # This should not be happened, because allFutures() and race() did not + # raise any exceptions. + ApiOperation.Failure + + let responses = + block: + var res: seq[ApiNodeResponse[responseType]] + for idx, pnode in pendingNodes.pairs(): + let apiResponse = + block: + let fut = pendingRequests[idx] + if fut.finished(): + if fut.failed() or fut.cancelled(): + let exc = fut.readError() + ApiNodeResponse[responseType]( + node: pnode, + data: ApiResponse[responseType].err("[" & $exc.name & "] " & + $exc.msg) + ) + else: + ApiNodeResponse[responseType]( + node: pnode, + data: ApiResponse[responseType].ok(fut.read()) + ) + else: + case status + of ApiOperation.Interrupt: + pendingNodes[idx].status = RestBeaconNodeStatus.Offline + ApiNodeResponse[responseType]( + node: pnode, + data: ApiResponse[responseType].err("Operation interrupted") + ) + of ApiOperation.Timeout: + pendingNodes[idx].status = RestBeaconNodeStatus.Offline + ApiNodeResponse[responseType]( + node: pnode, + data: ApiResponse[responseType].err( + "Operation timeout exceeded") + ) + of ApiOperation.Success, ApiOperation.Failure: + # This should not be happened, because all Futures should be + # finished, and `Failure` processed when Future is finished. + ApiNodeResponse[responseType]( + node: pnode, + data: ApiResponse[responseType].err("Unexpected error") + ) + res.add(apiResponse) + res + + ApiResponseSeq[responseType](status: status, data: responses) + template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc, timeout: Duration, body: untyped, handlers: untyped): untyped = @@ -822,3 +952,104 @@ proc prepareSyncCommitteeSubnets*( raise newException(ValidatorApiError, "Unable to prepare sync committee subnet") + +proc getValidatorsActivity*( + vc: ValidatorClientRef, epoch: Epoch, + validators: seq[ValidatorIndex] + ): Future[GetValidatorsActivityResponse] {.async.} = + logScope: request = "getValidatorsActivity" + let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + getValidatorsActivity(it, epoch, validators)) + case resp.status + of ApiOperation.Timeout: + debug "Unable to perform validator's activity request in time", + timeout = SlotDuration + return GetValidatorsActivityResponse() + of ApiOperation.Interrupt: + debug "Validator's activity request was interrupted" + return GetValidatorsActivityResponse() + of ApiOperation.Failure: + debug "Unexpected error happened while receiving validator's activity" + return GetValidatorsActivityResponse() + of ApiOperation.Success: + var activities: seq[RestActivityItem] + for apiResponse in resp.data: + if apiResponse.data.isErr(): + debug "Unable to retrieve validators activity data", + endpoint = apiResponse.node, error = apiResponse.data.error() + else: + let + response = apiResponse.data.get() + activity = + block: + var default: seq[RestActivityItem] + case response.status + of 200: + let res = decodeBytes(GetValidatorsActivityResponse, + response.data, response.contentType) + if res.isOk(): + let list = res.get().data + if len(list) != len(validators): + debug "Received incomplete validators activity response", + endpoint = apiResponse.node, + validators_count = len(validators), + activities_count = len(list) + default + else: + let isOrdered = + block: + var res = true + for index in 0 ..< len(validators): + if list[index].index != validators[index]: + res = false + break + res + if not(isOrdered): + debug "Received unordered validators activity response", + endpoint = apiResponse.node, + validators_count = len(validators), + activities_count = len(list) + default + else: + debug "Received validators activity response", + endpoint = apiResponse.node, + validators_count = len(validators), + activities_count = len(list) + list + else: + debug "Received invalid/incomplete response", + endpoint = apiResponse.node, error_message = res.error() + apiResponse.node.status = RestBeaconNodeStatus.Incompatible + default + of 400: + debug "Server reports invalid request", + response_code = response.status, + endpoint = apiResponse.node, + response_error = response.getGenericErrorMessage() + apiResponse.node.status = RestBeaconNodeStatus.Incompatible + default + of 500: + debug "Server reports internal error", + response_code = response.status, + endpoint = apiResponse.node, + response_error = response.getGenericErrorMessage() + apiResponse.node.status = RestBeaconNodeStatus.Offline + default + else: + debug "Server reports unexpected error code", + response_code = response.status, + endpoint = apiResponse.node, + response_error = response.getGenericErrorMessage() + apiResponse.node.status = RestBeaconNodeStatus.Offline + default + + if len(activity) > 0: + if len(activities) == 0: + activities = activity + else: + # If single node returns `active` it means that validator's + # activity was seen by this node, so result would be `active`. + for index in 0 ..< len(activities): + if activity[index].active: + activities[index].active = true + return GetValidatorsActivityResponse(data: activities) diff --git a/beacon_chain/validator_client/attestation_service.nim b/beacon_chain/validator_client/attestation_service.nim index e29e41533..d2de911e1 100644 --- a/beacon_chain/validator_client/attestation_service.nim +++ b/beacon_chain/validator_client/attestation_service.nim @@ -31,6 +31,15 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, res.get() let fork = vc.forkAtEpoch(adata.slot.epoch) + doAssert(validator.index.isSome()) + let vindex = validator.index.get() + + if not(vc.doppelgangerCheck(validator)): + info "Attestation has not been served (doppelganger check still active)", + slot = duty.data.slot, validator = shortLog(validator), + validator_index = vindex + return false + # TODO: signing_root is recomputed in getAttestationSignature just after, # but not for locally attached validators. let signingRoot = @@ -38,7 +47,6 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, fork, vc.beaconGenesis.genesis_validators_root, adata) let attestationRoot = adata.hash_tree_root() - let vindex = validator.index.get() let notSlashable = vc.attachedValidators.slashingProtection .registerAttestation(vindex, validator.pubkey, adata.source.epoch, @@ -120,8 +128,15 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, vc = service.client genesisRoot = vc.beaconGenesis.genesis_validators_root slot = proof.aggregate.data.slot - fork = vc.forkAtEpoch(slot.epoch) vindex = validator.index.get() + fork = vc.forkAtEpoch(slot.epoch) + + if not(vc.doppelgangerCheck(validator)): + info "Aggregate attestation has not been served " & + "(doppelganger check still active)", + slot = slot, validator = shortLog(validator), + validator_index = vindex + return false debug "Signing aggregate", validator = shortLog(validator), attestation = shortLog(proof.aggregate), fork = fork diff --git a/beacon_chain/validator_client/block_service.nim b/beacon_chain/validator_client/block_service.nim index 026704509..473e361a0 100644 --- a/beacon_chain/validator_client/block_service.nim +++ b/beacon_chain/validator_client/block_service.nim @@ -14,6 +14,13 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, else: defaultGraffitiBytes() fork = vc.forkAtEpoch(slot.epoch) + vindex = validator.index.get() + + if not(vc.doppelgangerCheck(validator)): + info "Block has not been produced (doppelganger check still active)", + slot = slot, validator = shortLog(validator), + validator_index = vindex + return debug "Publishing block", validator = shortLog(validator), delay = vc.getDelay(slot.block_deadline()), diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index b41b63894..89aa99289 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -63,6 +63,9 @@ type SyncCommitteeServiceRef* = ref object of ClientServiceRef + DoppelgangerServiceRef* = ref object of ClientServiceRef + enabled*: bool + DutyAndProof* = object epoch*: Epoch dependentRoot*: Eth2Digest @@ -116,6 +119,22 @@ type SyncCommitteeDutiesMap* = Table[ValidatorPubKey, EpochSyncDuties] ProposerMap* = Table[Epoch, ProposedData] + DoppelgangerStatus* {.pure.} = enum + None, Checking, Passed + + DoppelgangerAttempt* {.pure.} = enum + None, Failure, SuccessTrue, SuccessFalse + + DoppelgangerState* = object + startEpoch*: Epoch + epochsCount*: uint64 + lastAttempt*: DoppelgangerAttempt + status*: DoppelgangerStatus + + DoppelgangerDetection* = object + startSlot*: Slot + validators*: Table[ValidatorIndex, DoppelgangerState] + ValidatorClient* = object config*: ValidatorClientConf graffitiBytes*: GraffitiBytes @@ -126,11 +145,13 @@ type attestationService*: AttestationServiceRef blockService*: BlockServiceRef syncCommitteeService*: SyncCommitteeServiceRef + doppelgangerService*: DoppelgangerServiceRef runSlotLoopFut*: Future[void] sigintHandleFut*: Future[void] sigtermHandleFut*: Future[void] beaconClock*: BeaconClock attachedValidators*: ValidatorPool + doppelgangerDetection*: DoppelgangerDetection forks*: seq[Fork] forksAvailable*: AsyncEvent nodesAvailable*: AsyncEvent @@ -346,3 +367,120 @@ proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork = proc getSubcommitteeIndex*(index: IndexInSyncCommittee): SyncSubcommitteeIndex = SyncSubcommitteeIndex(uint16(index) div SYNC_SUBCOMMITTEE_SIZE) + +proc currentSlot*(vc: ValidatorClientRef): Slot = + vc.beaconClock.now().slotOrZero() + +proc addDoppelganger*(vc: ValidatorClientRef, validator: AttachedValidator) = + logScope: + validator = shortLog(validator) + + if vc.config.doppelgangerDetection: + let + vindex = validator.index.get() + startEpoch = vc.currentSlot().epoch() + state = + if (startEpoch == GENESIS_EPOCH) and + (validator.startSlot == GENESIS_SLOT): + DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64, + lastAttempt: DoppelgangerAttempt.None, + status: DoppelgangerStatus.Passed) + else: + DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64, + lastAttempt: DoppelgangerAttempt.None, + status: DoppelgangerStatus.Checking) + res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state) + + if res: + warn "Validator is already in doppelganger table", + validator_index = vindex, start_epoch = startEpoch, + start_slot = validator.startSlot + else: + if state.status == DoppelgangerStatus.Checking: + info "Doppelganger protection activated", validator_index = vindex, + start_epoch = startEpoch, start_slot = validator.startSlot + else: + info "Doppelganger protection skipped", validator_index = vindex, + start_epoch = startEpoch, start_slot = validator.startSlot + +proc removeDoppelganger*(vc: ValidatorClientRef, index: ValidatorIndex) = + if vc.config.doppelgangerDetection: + var state: DoppelgangerState + # We do not care about race condition, when validator is not yet added to + # the doppelganger's table, but it should be removed. + discard vc.doppelgangerDetection.validators.pop(index, state) + +proc addValidator*(vc: ValidatorClientRef, keystore: KeystoreData) = + let slot = vc.currentSlot() + case keystore.kind + of KeystoreKind.Local: + vc.attachedValidators.addLocalValidator(keystore, none[ValidatorIndex](), + slot) + of KeystoreKind.Remote: + let + httpFlags = + block: + var res: set[HttpClientFlag] + if RemoteKeystoreFlag.IgnoreSSLVerification in keystore.flags: + res.incl({HttpClientFlag.NoVerifyHost, + HttpClientFlag.NoVerifyServerName}) + res + prestoFlags = {RestClientFlag.CommaSeparatedArray} + clients = + block: + var res: seq[(RestClientRef, RemoteSignerInfo)] + for remote in keystore.remotes: + let client = RestClientRef.new($remote.url, prestoFlags, + httpFlags) + if client.isErr(): + warn "Unable to resolve distributed signer address", + remote_url = $remote.url, validator = $remote.pubkey + else: + res.add((client.get(), remote)) + res + if len(clients) > 0: + vc.attachedValidators.addRemoteValidator(keystore, clients, + none[ValidatorIndex](), slot) + else: + warn "Unable to initialize remote validator", + validator = $keystore.pubkey + +proc removeValidator*(vc: ValidatorClientRef, + pubkey: ValidatorPubKey) {.async.} = + let validator = vc.attachedValidators.getValidator(pubkey) + if not(isNil(validator)): + if vc.config.doppelgangerDetection: + if validator.index.isSome(): + vc.removeDoppelganger(validator.index.get()) + case validator.kind + of ValidatorKind.Local: + discard + of ValidatorKind.Remote: + # We must close all the REST clients running for the remote validator. + let pending = + block: + var res: seq[Future[void]] + for item in validator.clients: + res.add(item[0].closeWait()) + res + await allFutures(pending) + # Remove validator from ValidatorPool. + vc.attachedValidators.removeValidator(pubkey) + +proc doppelgangerCheck*(vc: ValidatorClientRef, + validator: AttachedValidator): bool = + if vc.config.doppelgangerDetection: + if validator.index.isNone(): + return false + if validator.startSlot > GENESIS_SLOT: + let + vindex = validator.index.get() + default = DoppelgangerState(status: DoppelgangerStatus.None) + currentEpoch = vc.currentSlot().epoch() + state = vc.doppelgangerDetection.validators.getOrDefault(vindex, + default) + state.status == DoppelgangerStatus.Passed + else: + true + else: + true diff --git a/beacon_chain/validator_client/doppelganger_service.nim b/beacon_chain/validator_client/doppelganger_service.nim new file mode 100644 index 000000000..3721868bd --- /dev/null +++ b/beacon_chain/validator_client/doppelganger_service.nim @@ -0,0 +1,117 @@ +# beacon_chain +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import chronicles +import common, api + +const + ServiceName = "doppelganger_service" + +logScope: service = ServiceName + +const + DOPPELGANGER_EPOCHS_COUNT = 2 + +proc getCheckingList*(vc: ValidatorClientRef): seq[ValidatorIndex] = + var res: seq[ValidatorIndex] + for index, value in vc.doppelgangerDetection.validators.pairs(): + if value.status == DoppelgangerStatus.Checking: + res.add(index) + res + +proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} = + let vc = service.client + let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT + debug "Sleeping until next epoch", sleep_time = sleepTime + await sleepAsync(sleepTime) + +proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch, + activities: GetValidatorsActivityResponse) = + let vc = service.client + if len(activities.data) == 0: + debug "Unable to monitor validator's activity for epoch", epoch = epoch + for index, value in vc.doppelgangerDetection.validators.mpairs(): + if value.status == DoppelgangerStatus.Checking: + value.epochsCount = 0'u64 + value.lastAttempt = DoppelgangerAttempt.Failure + else: + for activity in activities.data: + let vindex = activity.index + vc.doppelgangerDetection.validators.withValue(vindex, value): + if activity.active: + if value.status == DoppelgangerStatus.Checking: + value.epochsCount = 0'u64 + value.lastAttempt = DoppelgangerAttempt.SuccessTrue + warn "Validator's activity has been seen", + validator_index = vindex, epoch = epoch + vc.gracefulExit.fire() + return + else: + if value.status == DoppelgangerStatus.Checking: + value.lastAttempt = DoppelgangerAttempt.SuccessFalse + if value.epochsCount == DOPPELGANGER_EPOCHS_COUNT: + value.status = DoppelgangerStatus.Passed + info "Validator successfully passed doppelganger detection", + validator_index = vindex + else: + inc(value.epochsCount) + notice "Validator's activity was not seen", + validator_index = vindex, epoch = epoch, + epochs_count = value.epochsCount + +proc mainLoop(service: DoppelgangerServiceRef) {.async.} = + let vc = service.client + service.state = ServiceState.Running + + if service.enabled: + debug "Service started" + else: + debug "Service disabled because of configuration settings" + return + + while true: + let breakLoop = + try: + await service.waitForNextEpoch() + let + currentEpoch = vc.currentSlot().epoch() + previousEpoch = + if currentEpoch == Epoch(0): + currentEpoch + else: + currentEpoch - 1'u64 + validators = vc.getCheckingList() + if len(validators) > 0: + let activities = await vc.getValidatorsActivity(previousEpoch, + validators) + service.processActivities(previousEpoch, activities) + else: + debug "No validators found that require doppelganger protection" + discard + false + except CancelledError: + debug "Service interrupted" + true + except CatchableError as exc: + warn "Service crashed with unexpected error", err_name = exc.name, + err_msg = exc.msg + true + + if breakLoop: + break + +proc init*(t: typedesc[DoppelgangerServiceRef], + vc: ValidatorClientRef): Future[DoppelgangerServiceRef] {.async.} = + logScope: service = ServiceName + let res = DoppelgangerServiceRef(name: ServiceName, + client: vc, state: ServiceState.Initialized, + enabled: vc.config.doppelgangerDetection) + debug "Initializing service" + return res + +proc start*(service: DoppelgangerServiceRef) = + service.lifeFut = mainLoop(service) diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 3dc89dc95..19e0d2ab9 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -79,6 +79,9 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = pubkey = item.validator.pubkey, index = item.index vc.attachedValidators.updateValidator(item.validator.pubkey, item.index) + # Adding validator for doppelganger detection. + vc.addDoppelganger( + vc.attachedValidators.getValidator(item.validator.pubkey)) proc pollForAttesterDuties*(vc: ValidatorClientRef, epoch: Epoch): Future[int] {.async.} = diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 70a01a359..904e9a57f 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -84,14 +84,14 @@ proc findValidator(validators: auto, pubkey: ValidatorPubKey): some(idx.ValidatorIndex) proc addLocalValidator(node: BeaconNode, validators: auto, - item: KeystoreData) = + item: KeystoreData, slot: Slot) = let pubkey = item.pubkey index = findValidator(validators, pubkey) - node.attachedValidators[].addLocalValidator(item, index) + node.attachedValidators[].addLocalValidator(item, index, slot) proc addRemoteValidator(pool: var ValidatorPool, validators: auto, - item: KeystoreData) = + item: KeystoreData, slot: Slot) = var clients: seq[(RestClientRef, RemoteSignerInfo)] let httpFlags = block: @@ -108,20 +108,22 @@ proc addRemoteValidator(pool: var ValidatorPool, validators: auto, remote_url = $remote.url, validator = $remote.pubkey clients.add((client.get(), remote)) let index = findValidator(validators, item.pubkey) - pool.addRemoteValidator(item, clients, index) + pool.addRemoteValidator(item, clients, index, slot) proc addLocalValidators*(node: BeaconNode, validators: openArray[KeystoreData]) = + let slot = node.currentSlot() withState(node.dag.headState): for item in validators: - node.addLocalValidator(state.data.validators.asSeq(), item) + node.addLocalValidator(state.data.validators.asSeq(), item, slot) proc addRemoteValidators*(node: BeaconNode, validators: openArray[KeystoreData]) = + let slot = node.currentSlot() withState(node.dag.headState): for item in validators: node.attachedValidators[].addRemoteValidator( - state.data.validators.asSeq(), item) + state.data.validators.asSeq(), item, slot) proc addValidators*(node: BeaconNode) = let (localValidators, remoteValidators) = diff --git a/beacon_chain/validators/validator_pool.nim b/beacon_chain/validators/validator_pool.nim index 232f358e8..7573305ae 100644 --- a/beacon_chain/validators/validator_pool.nim +++ b/beacon_chain/validators/validator_pool.nim @@ -53,6 +53,8 @@ type # if the validator will be aggregating (in the near future) slotSignature*: Option[tuple[slot: Slot, signature: ValidatorSig]] + startSlot*: Slot + SignResponse* = Web3SignerDataResponse SignatureResult* = Result[ValidatorSig, string] @@ -81,27 +83,32 @@ template count*(pool: ValidatorPool): int = len(pool.validators) proc addLocalValidator*(pool: var ValidatorPool, item: KeystoreData, - index: Option[ValidatorIndex]) = + index: Option[ValidatorIndex], slot: Slot) = doAssert item.kind == KeystoreKind.Local let pubkey = item.pubkey let v = AttachedValidator(kind: ValidatorKind.Local, pubkey: pubkey, - index: index, data: item) + index: index, data: item, startSlot: slot) pool.validators[pubkey] = v - notice "Local validator attached", pubkey, validator = shortLog(v) + notice "Local validator attached", pubkey, validator = shortLog(v), + start_slot = slot validators.set(pool.count().int64) -proc addLocalValidator*(pool: var ValidatorPool, item: KeystoreData) = - addLocalValidator(pool, item, none[ValidatorIndex]()) +proc addLocalValidator*(pool: var ValidatorPool, item: KeystoreData, + slot: Slot) = + addLocalValidator(pool, item, none[ValidatorIndex](), slot) proc addRemoteValidator*(pool: var ValidatorPool, item: KeystoreData, - clients: seq[(RestClientRef, RemoteSignerInfo)], index: Option[ValidatorIndex]) = + clients: seq[(RestClientRef, RemoteSignerInfo)], + index: Option[ValidatorIndex], slot: Slot) = doAssert item.kind == KeystoreKind.Remote let pubkey = item.pubkey let v = AttachedValidator(kind: ValidatorKind.Remote, pubkey: pubkey, - index: index, data: item, clients: clients) + index: index, data: item, clients: clients, + startSlot: slot) pool.validators[pubkey] = v notice "Remote validator attached", pubkey, validator = shortLog(v), - remote_signer = $item.remotes + remote_signer = $item.remotes, + start_slot = slot validators.set(pool.count().int64) proc getValidator*(pool: ValidatorPool,