diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index 1fd672f1a..546bb2d38 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -130,6 +130,7 @@ type DataRootEnclosedObject | DataOptimisticObject | DataVersionEnclosedObject | + DataOptimisticAndFinalizedObject | GetBlockV2Response | GetDistributedKeystoresResponse | GetKeystoresResponse | diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index 9280cb666..b442a495c 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -395,6 +395,11 @@ type data*: T execution_optimistic*: Option[bool] + DataOptimisticAndFinalizedObject*[T] = object + data*: T + execution_optimistic*: Option[bool] + finalized*: Option[bool] + ForkedSignedBlockHeader* = object message*: uint32 # message offset signature*: ValidatorSig @@ -515,7 +520,7 @@ type GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation] GetAttesterDutiesResponse* = DataRootEnclosedObject[seq[RestAttesterDuty]] GetBlockAttestationsResponse* = DataEnclosedObject[seq[Attestation]] - GetBlockHeaderResponse* = DataOptimisticObject[RestBlockHeaderInfo] + GetBlockHeaderResponse* = DataOptimisticAndFinalizedObject[RestBlockHeaderInfo] GetBlockHeadersResponse* = DataEnclosedObject[seq[RestBlockHeaderInfo]] GetBlockRootResponse* = DataOptimisticObject[RestRoot] GetDebugChainHeadsResponse* = DataEnclosedObject[seq[RestChainHead]] diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 89855588b..9304a73aa 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -2570,3 +2570,92 @@ proc getValidatorsLiveness*( res return GetValidatorsLivenessResponse(data: response) + +proc getFinalizedBlockHeader*( + vc: ValidatorClientRef, + ): Future[Opt[GetBlockHeaderResponse]] {.async.} = + const RequestName = "getFinalizedBlockHeader" + + let + blockIdent = BlockIdent.init(BlockIdentType.Finalized) + resp = vc.onceToAll(RestPlainResponse, + SlotDuration, + ViableNodeStatus, + {BeaconNodeRole.Duties}, + getBlockHeaderPlain(it, blockIdent)) + case resp.status + of ApiOperation.Timeout: + debug "Unable to obtain finalized block header in time", + timeout = SlotDuration + return Opt.none(GetBlockHeaderResponse) + of ApiOperation.Interrupt: + debug "Finalized block header request was interrupted" + return Opt.none(GetBlockHeaderResponse) + of ApiOperation.Failure: + debug "Unexpected error happened while trying to get finalized block header" + return Opt.none(GetBlockHeaderResponse) + of ApiOperation.Success: + var oldestBlockHeader: GetBlockHeaderResponse + var oldestEpoch: Opt[Epoch] + for apiResponse in resp.data: + if apiResponse.data.isErr(): + debug "Unable to get finalized block header", + endpoint = apiResponse.node, error = apiResponse.data.error + else: + let response = apiResponse.data.get() + case response.status + of 200: + let res = decodeBytes(GetBlockHeaderResponse, + response.data, response.contentType) + if res.isOk(): + let + rdata = res.get() + epoch = rdata.data.header.message.slot.epoch() + if oldestEpoch.get(FAR_FUTURE_EPOCH) > epoch: + oldestEpoch = Opt.some(epoch) + oldestBlockHeader = rdata + else: + let failure = ApiNodeFailure.init( + ApiFailure.UnexpectedResponse, RequestName, + apiResponse.node, response.status, $res.error) + # We do not update beacon node's status anymore because of + # issue #5377. + debug ResponseDecodeError, reason = getFailureReason(failure) + continue + of 400: + let failure = ApiNodeFailure.init( + ApiFailure.Invalid, RequestName, + apiResponse.node, response.status, response.getErrorMessage()) + # We do not update beacon node's status anymore because of + # issue #5377. + debug ResponseInvalidError, reason = getFailureReason(failure) + continue + of 404: + let failure = ApiNodeFailure.init( + ApiFailure.NotFound, RequestName, + apiResponse.node, response.status, response.getErrorMessage()) + # We do not update beacon node's status anymore because of + # issue #5377. + debug ResponseNotFoundError, reason = getFailureReason(failure) + continue + of 500: + let failure = ApiNodeFailure.init( + ApiFailure.Internal, RequestName, + apiResponse.node, response.status, response.getErrorMessage()) + # We do not update beacon node's status anymore because of + # issue #5377. + debug ResponseInternalError, reason = getFailureReason(failure) + continue + else: + let failure = ApiNodeFailure.init( + ApiFailure.UnexpectedCode, RequestName, + apiResponse.node, response.status, response.getErrorMessage()) + # We do not update beacon node's status anymore because of + # issue #5377. + debug ResponseUnexpectedError, reason = getFailureReason(failure) + continue + + if oldestEpoch.isSome(): + return Opt.some(oldestBlockHeader) + else: + return Opt.none(GetBlockHeaderResponse) diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index eb4a50557..2d6dd0767 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -65,7 +65,9 @@ type DutiesServiceRef* = ref object of ClientServiceRef pollingAttesterDutiesTask*: Future[void] pollingSyncDutiesTask*: Future[void] + pruneSlashingDatabaseTask*: Future[void] syncSubscriptionEpoch*: Opt[Epoch] + lastSlashingEpoch*: Opt[Epoch] FallbackServiceRef* = ref object of ClientServiceRef changesEvent*: AsyncEvent @@ -229,6 +231,7 @@ type blocksSeen*: Table[Slot, BlockDataItem] rootsSeen*: Table[Eth2Digest, Slot] processingDelay*: Opt[Duration] + finalizedEpoch*: Opt[Epoch] rng*: ref HmacDrbgContext ApiStrategyKind* {.pure.} = enum diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index fe5a6a431..561f1bb8a 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -19,7 +19,8 @@ logScope: service = ServiceName type DutiesServiceLoop* = enum AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop, - ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop + ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop, + SlashPruningLoop chronicles.formatIt(DutiesServiceLoop): case it @@ -30,6 +31,7 @@ chronicles.formatIt(DutiesServiceLoop): of ProposerPreparationLoop: "proposer_prepare_loop" of ValidatorRegisterLoop: "validator_register_loop" of DynamicValidatorsLoop: "dynamic_validators_loop" + of SlashPruningLoop: "slashing_pruning_loop" proc checkDuty(duty: RestAttesterDuty): bool = (duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and @@ -677,6 +679,70 @@ proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} = # Spawning new attestation duties task. service.pollingSyncDutiesTask = service.pollForSyncCommitteeDuties() +proc getNextEpochMiddleSlot(vc: ValidatorClientRef): Slot = + let + middleSlot = Slot(SLOTS_PER_EPOCH div 2) + currentSlot = vc.beaconClock.now().slotOrZero() + slotInEpoch = currentSlot.since_epoch_start() + + if slotInEpoch >= middleSlot: + (currentSlot.epoch + 1'u64).start_slot() + uint64(middleSlot) + else: + currentSlot + (uint64(middleSlot) - uint64(slotInEpoch)) + +proc pruneSlashingDatabase(service: DutiesServiceRef) {.async.} = + let + vc = service.client + currentSlot = vc.beaconClock.now().slotOrZero() + startTime = Moment.now() + blockHeader = + try: + await vc.getFinalizedBlockHeader() + except CancelledError as exc: + debug "Finalized block header request was interrupted", + slot = currentSlot + raise exc + except CatchableError as exc: + error "Unexpected error occured while requesting " & + "finalized block header", slot = currentSlot, + err_name = exc.name, err_msg = exc.msg + Opt.none(GetBlockHeaderResponse) + checkpointTime = Moment.now() + if blockHeader.isSome(): + let epoch = blockHeader.get().data.header.message.slot.epoch + vc.finalizedEpoch = Opt.some(epoch) + if service.lastSlashingEpoch.get(FAR_FUTURE_EPOCH) != epoch: + vc.attachedValidators[] + .slashingProtection + .pruneAfterFinalization(epoch) + service.lastSlashingEpoch = Opt.some(epoch) + let finishTime = Moment.now() + debug "Slashing database has been pruned", slot = currentSlot, + epoch = currentSlot.epoch(), + finalized_epoch = epoch, + elapsed_time = (finishTime - startTime), + pruning_time = (finishTime - checkpointTime) + +proc slashingDatabasePruningLoop(service: DutiesServiceRef) {.async.} = + let vc = service.client + debug "Slashing database pruning loop is waiting for initialization" + await allFutures( + vc.preGenesisEvent.wait(), + vc.indicesAvailable.wait(), + vc.forksAvailable.wait() + ) + doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point") + while true: + let slot = await vc.checkedWaitForSlot(vc.getNextEpochMiddleSlot(), + aggregateSlotOffset, false) + if slot.isNone(): + continue + + if not(isNil(service.pruneSlashingDatabaseTask)) and + not(service.pruneSlashingDatabaseTask.finished()): + await cancelAndWait(service.pruneSlashingDatabaseTask) + service.pruneSlashingDatabaseTask = service.pruneSlashingDatabase() + template checkAndRestart(serviceLoop: DutiesServiceLoop, future: Future[void], body: untyped): untyped = if future.finished(): @@ -715,6 +781,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = else: debug "Dynamic validators update loop disabled" @[] + slashPruningFut = service.slashingDatabasePruningLoop() web3SignerUrls = vc.config.web3SignerUrls while true: @@ -729,6 +796,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = FutureBase(indicesFut), FutureBase(syncFut), FutureBase(prepareFut), + FutureBase(slashPruningFut) ] for fut in dynamicFuts: futures.add fut @@ -749,6 +817,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = service.dynamicValidatorsLoop( web3SignerUrls[i], vc.config.web3signerUpdateInterval)) + checkAndRestart(SlashPruningLoop, slashPruningFut, + service.slashingDatabasePruningLoop()) false except CancelledError: debug "Service interrupted" @@ -774,6 +844,9 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = if not(isNil(service.pollingSyncDutiesTask)) and not(service.pollingSyncDutiesTask.finished()): pending.add(service.pollingSyncDutiesTask.cancelAndWait()) + if not(isNil(service.pruneSlashingDatabaseTask)) and + not(service.pruneSlashingDatabaseTask.finished()): + pending.add(service.pruneSlashingDatabaseTask.cancelAndWait()) await allFutures(pending) true except CatchableError as exc: