VC: Add pruning slashing database. (#5551)

* Add slashing database pruning to VC.
Fix GetBlockHeaderResponse object declaration (spec has been changed).

* Switch to getFinalizedBlockHeader instead.

* Fix proper sign.
Add statements.
Show pruning log statement only when pruning happens.

* Optimize and remove debugging helpers.
This commit is contained in:
Eugene Kabanov 2023-11-06 16:40:44 +02:00 committed by GitHub
parent eb7c8b7db2
commit 49c851109e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 2 deletions

View File

@ -130,6 +130,7 @@ type
DataRootEnclosedObject | DataRootEnclosedObject |
DataOptimisticObject | DataOptimisticObject |
DataVersionEnclosedObject | DataVersionEnclosedObject |
DataOptimisticAndFinalizedObject |
GetBlockV2Response | GetBlockV2Response |
GetDistributedKeystoresResponse | GetDistributedKeystoresResponse |
GetKeystoresResponse | GetKeystoresResponse |

View File

@ -395,6 +395,11 @@ type
data*: T data*: T
execution_optimistic*: Option[bool] execution_optimistic*: Option[bool]
DataOptimisticAndFinalizedObject*[T] = object
data*: T
execution_optimistic*: Option[bool]
finalized*: Option[bool]
ForkedSignedBlockHeader* = object ForkedSignedBlockHeader* = object
message*: uint32 # message offset message*: uint32 # message offset
signature*: ValidatorSig signature*: ValidatorSig
@ -515,7 +520,7 @@ type
GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation] GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation]
GetAttesterDutiesResponse* = DataRootEnclosedObject[seq[RestAttesterDuty]] GetAttesterDutiesResponse* = DataRootEnclosedObject[seq[RestAttesterDuty]]
GetBlockAttestationsResponse* = DataEnclosedObject[seq[Attestation]] GetBlockAttestationsResponse* = DataEnclosedObject[seq[Attestation]]
GetBlockHeaderResponse* = DataOptimisticObject[RestBlockHeaderInfo] GetBlockHeaderResponse* = DataOptimisticAndFinalizedObject[RestBlockHeaderInfo]
GetBlockHeadersResponse* = DataEnclosedObject[seq[RestBlockHeaderInfo]] GetBlockHeadersResponse* = DataEnclosedObject[seq[RestBlockHeaderInfo]]
GetBlockRootResponse* = DataOptimisticObject[RestRoot] GetBlockRootResponse* = DataOptimisticObject[RestRoot]
GetDebugChainHeadsResponse* = DataEnclosedObject[seq[RestChainHead]] GetDebugChainHeadsResponse* = DataEnclosedObject[seq[RestChainHead]]

View File

@ -2570,3 +2570,92 @@ proc getValidatorsLiveness*(
res res
return GetValidatorsLivenessResponse(data: response) return GetValidatorsLivenessResponse(data: response)
proc getFinalizedBlockHeader*(
vc: ValidatorClientRef,
): Future[Opt[GetBlockHeaderResponse]] {.async.} =
const RequestName = "getFinalizedBlockHeader"
let
blockIdent = BlockIdent.init(BlockIdentType.Finalized)
resp = vc.onceToAll(RestPlainResponse,
SlotDuration,
ViableNodeStatus,
{BeaconNodeRole.Duties},
getBlockHeaderPlain(it, blockIdent))
case resp.status
of ApiOperation.Timeout:
debug "Unable to obtain finalized block header in time",
timeout = SlotDuration
return Opt.none(GetBlockHeaderResponse)
of ApiOperation.Interrupt:
debug "Finalized block header request was interrupted"
return Opt.none(GetBlockHeaderResponse)
of ApiOperation.Failure:
debug "Unexpected error happened while trying to get finalized block header"
return Opt.none(GetBlockHeaderResponse)
of ApiOperation.Success:
var oldestBlockHeader: GetBlockHeaderResponse
var oldestEpoch: Opt[Epoch]
for apiResponse in resp.data:
if apiResponse.data.isErr():
debug "Unable to get finalized block header",
endpoint = apiResponse.node, error = apiResponse.data.error
else:
let response = apiResponse.data.get()
case response.status
of 200:
let res = decodeBytes(GetBlockHeaderResponse,
response.data, response.contentType)
if res.isOk():
let
rdata = res.get()
epoch = rdata.data.header.message.slot.epoch()
if oldestEpoch.get(FAR_FUTURE_EPOCH) > epoch:
oldestEpoch = Opt.some(epoch)
oldestBlockHeader = rdata
else:
let failure = ApiNodeFailure.init(
ApiFailure.UnexpectedResponse, RequestName,
apiResponse.node, response.status, $res.error)
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseDecodeError, reason = getFailureReason(failure)
continue
of 400:
let failure = ApiNodeFailure.init(
ApiFailure.Invalid, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseInvalidError, reason = getFailureReason(failure)
continue
of 404:
let failure = ApiNodeFailure.init(
ApiFailure.NotFound, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseNotFoundError, reason = getFailureReason(failure)
continue
of 500:
let failure = ApiNodeFailure.init(
ApiFailure.Internal, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseInternalError, reason = getFailureReason(failure)
continue
else:
let failure = ApiNodeFailure.init(
ApiFailure.UnexpectedCode, RequestName,
apiResponse.node, response.status, response.getErrorMessage())
# We do not update beacon node's status anymore because of
# issue #5377.
debug ResponseUnexpectedError, reason = getFailureReason(failure)
continue
if oldestEpoch.isSome():
return Opt.some(oldestBlockHeader)
else:
return Opt.none(GetBlockHeaderResponse)

View File

@ -65,7 +65,9 @@ type
DutiesServiceRef* = ref object of ClientServiceRef DutiesServiceRef* = ref object of ClientServiceRef
pollingAttesterDutiesTask*: Future[void] pollingAttesterDutiesTask*: Future[void]
pollingSyncDutiesTask*: Future[void] pollingSyncDutiesTask*: Future[void]
pruneSlashingDatabaseTask*: Future[void]
syncSubscriptionEpoch*: Opt[Epoch] syncSubscriptionEpoch*: Opt[Epoch]
lastSlashingEpoch*: Opt[Epoch]
FallbackServiceRef* = ref object of ClientServiceRef FallbackServiceRef* = ref object of ClientServiceRef
changesEvent*: AsyncEvent changesEvent*: AsyncEvent
@ -229,6 +231,7 @@ type
blocksSeen*: Table[Slot, BlockDataItem] blocksSeen*: Table[Slot, BlockDataItem]
rootsSeen*: Table[Eth2Digest, Slot] rootsSeen*: Table[Eth2Digest, Slot]
processingDelay*: Opt[Duration] processingDelay*: Opt[Duration]
finalizedEpoch*: Opt[Epoch]
rng*: ref HmacDrbgContext rng*: ref HmacDrbgContext
ApiStrategyKind* {.pure.} = enum ApiStrategyKind* {.pure.} = enum

View File

@ -19,7 +19,8 @@ logScope: service = ServiceName
type type
DutiesServiceLoop* = enum DutiesServiceLoop* = enum
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop, AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop,
ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop ProposerPreparationLoop, ValidatorRegisterLoop, DynamicValidatorsLoop,
SlashPruningLoop
chronicles.formatIt(DutiesServiceLoop): chronicles.formatIt(DutiesServiceLoop):
case it case it
@ -30,6 +31,7 @@ chronicles.formatIt(DutiesServiceLoop):
of ProposerPreparationLoop: "proposer_prepare_loop" of ProposerPreparationLoop: "proposer_prepare_loop"
of ValidatorRegisterLoop: "validator_register_loop" of ValidatorRegisterLoop: "validator_register_loop"
of DynamicValidatorsLoop: "dynamic_validators_loop" of DynamicValidatorsLoop: "dynamic_validators_loop"
of SlashPruningLoop: "slashing_pruning_loop"
proc checkDuty(duty: RestAttesterDuty): bool = proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and (duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
@ -677,6 +679,70 @@ proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} =
# Spawning new attestation duties task. # Spawning new attestation duties task.
service.pollingSyncDutiesTask = service.pollForSyncCommitteeDuties() service.pollingSyncDutiesTask = service.pollForSyncCommitteeDuties()
proc getNextEpochMiddleSlot(vc: ValidatorClientRef): Slot =
let
middleSlot = Slot(SLOTS_PER_EPOCH div 2)
currentSlot = vc.beaconClock.now().slotOrZero()
slotInEpoch = currentSlot.since_epoch_start()
if slotInEpoch >= middleSlot:
(currentSlot.epoch + 1'u64).start_slot() + uint64(middleSlot)
else:
currentSlot + (uint64(middleSlot) - uint64(slotInEpoch))
proc pruneSlashingDatabase(service: DutiesServiceRef) {.async.} =
let
vc = service.client
currentSlot = vc.beaconClock.now().slotOrZero()
startTime = Moment.now()
blockHeader =
try:
await vc.getFinalizedBlockHeader()
except CancelledError as exc:
debug "Finalized block header request was interrupted",
slot = currentSlot
raise exc
except CatchableError as exc:
error "Unexpected error occured while requesting " &
"finalized block header", slot = currentSlot,
err_name = exc.name, err_msg = exc.msg
Opt.none(GetBlockHeaderResponse)
checkpointTime = Moment.now()
if blockHeader.isSome():
let epoch = blockHeader.get().data.header.message.slot.epoch
vc.finalizedEpoch = Opt.some(epoch)
if service.lastSlashingEpoch.get(FAR_FUTURE_EPOCH) != epoch:
vc.attachedValidators[]
.slashingProtection
.pruneAfterFinalization(epoch)
service.lastSlashingEpoch = Opt.some(epoch)
let finishTime = Moment.now()
debug "Slashing database has been pruned", slot = currentSlot,
epoch = currentSlot.epoch(),
finalized_epoch = epoch,
elapsed_time = (finishTime - startTime),
pruning_time = (finishTime - checkpointTime)
proc slashingDatabasePruningLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Slashing database pruning loop is waiting for initialization"
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
let slot = await vc.checkedWaitForSlot(vc.getNextEpochMiddleSlot(),
aggregateSlotOffset, false)
if slot.isNone():
continue
if not(isNil(service.pruneSlashingDatabaseTask)) and
not(service.pruneSlashingDatabaseTask.finished()):
await cancelAndWait(service.pruneSlashingDatabaseTask)
service.pruneSlashingDatabaseTask = service.pruneSlashingDatabase()
template checkAndRestart(serviceLoop: DutiesServiceLoop, template checkAndRestart(serviceLoop: DutiesServiceLoop,
future: Future[void], body: untyped): untyped = future: Future[void], body: untyped): untyped =
if future.finished(): if future.finished():
@ -715,6 +781,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
else: else:
debug "Dynamic validators update loop disabled" debug "Dynamic validators update loop disabled"
@[] @[]
slashPruningFut = service.slashingDatabasePruningLoop()
web3SignerUrls = vc.config.web3SignerUrls web3SignerUrls = vc.config.web3SignerUrls
while true: while true:
@ -729,6 +796,7 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
FutureBase(indicesFut), FutureBase(indicesFut),
FutureBase(syncFut), FutureBase(syncFut),
FutureBase(prepareFut), FutureBase(prepareFut),
FutureBase(slashPruningFut)
] ]
for fut in dynamicFuts: for fut in dynamicFuts:
futures.add fut futures.add fut
@ -749,6 +817,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
service.dynamicValidatorsLoop( service.dynamicValidatorsLoop(
web3SignerUrls[i], web3SignerUrls[i],
vc.config.web3signerUpdateInterval)) vc.config.web3signerUpdateInterval))
checkAndRestart(SlashPruningLoop, slashPruningFut,
service.slashingDatabasePruningLoop())
false false
except CancelledError: except CancelledError:
debug "Service interrupted" debug "Service interrupted"
@ -774,6 +844,9 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
if not(isNil(service.pollingSyncDutiesTask)) and if not(isNil(service.pollingSyncDutiesTask)) and
not(service.pollingSyncDutiesTask.finished()): not(service.pollingSyncDutiesTask.finished()):
pending.add(service.pollingSyncDutiesTask.cancelAndWait()) pending.add(service.pollingSyncDutiesTask.cancelAndWait())
if not(isNil(service.pruneSlashingDatabaseTask)) and
not(service.pruneSlashingDatabaseTask.finished()):
pending.add(service.pruneSlashingDatabaseTask.cancelAndWait())
await allFutures(pending) await allFutures(pending)
true true
except CatchableError as exc: except CatchableError as exc: