VC: various fixes (#2730)

* Fix firstSuccess() template missing timeouts.

* Fix validator race condition.
Fix logs to be compatible with beacon_node logs.
Add CatchableError handlers to avoid crashes.
Move some logs from Notice to Debug level.
Fix some [unused] warnings.

* Fix block proposal issue for slots in the past and from the future.

* Change sent to published.

* Address review comments #1.
This commit is contained in:
Eugene Kabanov 2021-07-19 17:31:02 +03:00 committed by GitHub
parent 3e3e17fec3
commit f0c30e31b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 264 additions and 147 deletions

View File

@ -186,7 +186,7 @@ programMain:
fatal "Not enough beacon nodes in command line"
quit 1
debug "Launching validator client", version = fullVersionStr,
notice "Launching validator client", version = fullVersionStr,
cmdParams = commandLineParams(),
config,
beacon_nodes_count = len(beaconNodes)

View File

@ -342,27 +342,30 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
RestBeaconNodeStatus.Uninitalized}
let offlineNodes = vc.beaconNodes.filterIt(it.status in offlineMask)
warn "There no beacon nodes available, refreshing nodes status",
warn "No working beacon nodes available, refreshing nodes status",
online_nodes = len(onlineNodes), offline_nodes = len(offlineNodes)
var checkFut = vc.checkNodes(offlineMask)
let checkOp =
block:
var dontRushFut = sleepAsync(500.milliseconds)
if isNil(timerFut):
try:
# We use `allFutures()` to keep result in `checkFut`, but still
# be able to check errors.
await allFutures(checkFut, dontRushFut)
await allFutures(checkFut)
let onlineCount = vc.beaconNodes.countIt(
it.status == RestBeaconNodeStatus.Online)
if onlineCount == 0:
# Small pause here to avoid continous spam beacon nodes with
# checking requests.
await sleepAsync(500.milliseconds)
ApiOperation.Success
except CancelledError:
# `allFutures()` could not cancel Futures.
if not(checkFut.finished()):
checkFut.cancel()
if not(dontRushFut.finished()):
dontRushFut.cancel()
await allFutures(checkFut, dontRushFut)
await allFutures(checkFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race()` or `allFutures()` start raise
@ -370,24 +373,26 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
ApiOperation.Failure
else:
try:
discard await race(allFutures(checkFut, dontRushFut), timerFut)
discard await race(checkFut, timerFut)
if checkFut.finished():
let onlineCount = vc.beaconNodes.countIt(
it.status == RestBeaconNodeStatus.Online)
if onlineCount == 0:
# Small pause here to avoid continous spam beacon nodes with
# checking requests.
await sleepAsync(500.milliseconds)
ApiOperation.Success
else:
checkFut.cancel()
if not(dontRushFut.finished()):
dontRushFut.cancel()
await allFutures(checkFut, dontRushFut)
await allFutures(checkFut)
ApiOperation.Timeout
except CancelledError:
# `race()` and `allFutures()` could not cancel Futures.
if not(timerFut.finished()):
timerFut.cancel()
if not(dontRushFut.finished()):
dontRushFut.cancel()
if not(checkFut.finished()):
checkFut.cancel()
await allFutures(checkFut, dontRushFut, timerFut)
await allFutures(checkFut, timerFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race` or `allFutures` start raise

View File

@ -13,10 +13,12 @@ type
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
duty: DutyAndProof): Future[bool] {.async.} =
let vc = service.client
let validator = vc.attachedValidators.getValidator(duty.data.pubkey)
if validator.index.isNone():
warn "Validator index is missing", validator = validator.pubKey
let validator =
block:
let res = vc.getValidator(duty.data.pubkey)
if res.isNone():
return false
res.get()
let fork = vc.fork.get()
@ -32,10 +34,10 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
adata.source.epoch,
adata.target.epoch, signingRoot)
if notSlashable.isErr():
warn "Slashing protection activated for attestation", slot = duty.data.slot,
validator = validator.pubKey,
validator_index = duty.data.validator_index,
badVoteDetails = $notSlashable.error
warn "Slashing protection activated for attestation",
slot = duty.data.slot,
validator = shortLog(validator),
validator_index = vindex, badVoteDetails = $notSlashable.error
return false
let attestation = await validator.produceAndSignAttestation(adata,
@ -46,24 +48,35 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
let res =
try:
await vc.submitPoolAttestations(@[attestation])
except ValidatorApiError as exc:
error "Unable to submit attestation", slot = duty.data.slot,
validator = validator.pubKey,
validator_index = duty.data.validator_index
raise exc
except ValidatorApiError:
error "Unable to publish attestation",
attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex
return false
except CatchableError as exc:
error "Unexpected error occured while publishing attestation",
attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3))
let indexInCommittee = duty.data.validator_committee_index
if res:
notice "Attestation published", validator = validator.pubKey,
validator_index = duty.data.validator_index, slot = duty.data.slot,
delay = delay
return true
notice "Attestation published", attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex,
delay = delay,
indexInCommittee = indexInCommittee
else:
warn "Attestation was not accepted by beacon node",
validator = validator.pubKey,
validator_index = duty.data.validator_index,
slot = duty.data.slot, delay = delay
return false
attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex, delay = delay,
indexInCommittee = indexInCommittee
return res
proc serveAggregateAndProof*(service: AttestationServiceRef,
proof: AggregateAndProof,
@ -78,16 +91,40 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
genesisRoot)
let signedProof = SignedAggregateAndProof(message: proof,
signature: signature)
let aggregationSlot = proof.aggregate.data.slot
let vindex = validator.index.get()
let res =
try:
return await vc.publishAggregateAndProofs(@[signedProof]):
await vc.publishAggregateAndProofs(@[signedProof])
except ValidatorApiError:
warn "Unable to publish aggregate and proofs"
error "Unable to publish aggregated attestation",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
aggregationSlot = aggregationSlot,
validator_index = vindex
return false
except CatchableError as exc:
error "Unexpected error happened", err_name = exc.name,
err_msg = exc.msg
error "Unexpected error occured while publishing aggregated attestation",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
aggregationSlot = aggregationSlot,
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
if res:
notice "Aggregated attestation published",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
aggregationSlot = aggregationSlot, validator_index = vindex
else:
warn "Aggregated attestation was not accepted by beacon node",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
aggregationSlot = aggregationSlot, validator_index = vindex
return res
proc produceAndPublishAttestations*(service: AttestationServiceRef,
slot: Slot, committee_index: CommitteeIndex,
duties: seq[DutyAndProof]
@ -108,7 +145,8 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
if (duty.data.slot != ad.slot) or
(uint64(duty.data.committee_index) != ad.index):
error "Inconsistent validator duties during attestation signing",
validator = duty.data.pubkey, duty_slot = duty.data.slot,
validator = shortLog(duty.data.pubkey),
duty_slot = duty.data.slot,
duty_index = duty.data.committee_index,
attestation_slot = ad.slot, attestation_index = ad.index
continue
@ -182,7 +220,13 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
try:
await vc.getAggregatedAttestation(slot, attestationRoot)
except ValidatorApiError:
error "Unable to retrieve aggregated attestation data"
error "Unable to get aggregated attestation data", slot = slot,
attestation_root = shortLog(attestationRoot)
return
except CatchableError as exc:
error "Unexpected error occured while getting aggregated attestation",
slot = slot, attestation_root = shortLog(attestationRoot),
err_name = exc.name, err_msg = exc.msg
return
let pendingAggregates =
@ -219,13 +263,13 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
(succeed, errored, failed)
let delay = vc.getDelay(seconds((int64(SECONDS_PER_SLOT) div 3) * 2))
debug "Aggregate attestation statistics", total = len(pendingAggregates),
debug "Aggregated attestation statistics", total = len(pendingAggregates),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
committee_index = committeeIndex
else:
notice "No aggregate and proofs scheduled for slot", slot = slot,
debug "No aggregate and proofs scheduled for slot", slot = slot,
committee_index = committeeIndex
proc publishAttestationsAndAggregates(service: AttestationServiceRef,
@ -242,22 +286,26 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
# TODO (cheatfate): Here should be present timeout.
let startTime = Moment.now()
await vc.waitForBlockPublished(slot)
let finishTime = Moment.now()
debug "Block proposal awaited", slot = slot,
duration = (finishTime - startTime)
let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur
block:
let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3))
notice "Producing attestations", delay = delay, slot = slot,
debug "Producing attestations", delay = delay, slot = slot,
committee_index = committee_index,
duties_count = len(duties)
let ad =
try:
await service.produceAndPublishAttestations(slot, committee_index,
duties)
await service.produceAndPublishAttestations(slot, committee_index, duties)
except ValidatorApiError:
error "Unable to proceed attestations"
error "Unable to proceed attestations", slot = slot,
committee_index = committee_index, duties_count = len(duties)
return
except CatchableError as exc:
error "Unexpected error while producing attestations", slot = slot,
committee_index = committee_index, duties_count = len(duties),
err_name = exc.name, err_msg = exc.msg
return
if aggregateTime != ZeroDuration:
@ -265,7 +313,7 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
block:
let delay = vc.getDelay(seconds((int64(SECONDS_PER_SLOT) div 3) * 2))
notice "Producing aggregate and proofs", delay = delay
debug "Producing aggregate and proofs", delay = delay
await service.produceAndPublishAggregates(ad, duties)
proc spawnAttestationTasks(service: AttestationServiceRef,

View File

@ -5,11 +5,6 @@ logScope: service = "block_service"
proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
validator: AttachedValidator) {.async.} =
logScope:
validator = validator.pubKey
slot = slot
wallSlot = currentSlot
let
genesisRoot = vc.beaconGenesis.genesis_validators_root
graffiti =
@ -19,21 +14,26 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
defaultGraffitiBytes()
fork = vc.fork.get()
debug "Publishing block", validator = validator.pubKey,
debug "Publishing block", validator = shortLog(validator),
delay = vc.getDelay(ZeroDuration),
wall_slot = currentSlot,
genesis_root = genesisRoot,
graffiti = graffiti, fork = fork, slot = slot,
wall_slot = currentSlot
try:
let randaoReveal = await validator.genRandaoReveal(fork, genesisRoot, slot)
let beaconBlock =
try:
await vc.produceBlock(slot, randaoReveal, graffiti)
except ValidatorApiError as exc:
error "Unable to retrieve block data", slot = currentSlot,
validator = validator.pubKey
except ValidatorApiError:
error "Unable to retrieve block data", slot = slot,
wall_slot = currentSlot, validator = shortLog(validator)
return
except CatchableError as exc:
error "An unexpected error occurred while getting block data",
err_name = exc.name, err_msg = exc.msg
return
let blockRoot = hash_tree_root(beaconBlock)
var signedBlock = SignedBeaconBlock(message: beaconBlock,
root: hash_tree_root(beaconBlock))
@ -55,28 +55,33 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
try:
await vc.publishBlock(signedBlock)
except ValidatorApiError:
error "Unable to submit block", slot = currentSlot,
validator = validator.pubKey, block_root = blockRoot,
deposits = len(signedBlock.message.body.deposits),
attestations = len(signedBlock.message.body.attestations),
graffiti = graffiti
error "Unable to publish block", blck = shortLog(signedBlock.message),
blockRoot = shortLog(blockRoot),
validator = shortLog(validator),
validator_index = validator.index.get(),
wall_slot = currentSlot
return
except CatchableError as exc:
error "An unexpected error occurred while publishing block",
err_name = exc.name, err_msg = exc.msg
return
if res:
notice "Block published", slot = currentSlot,
validator = validator.pubKey, validator_index = validator.index.get(),
deposits = len(signedBlock.message.body.deposits),
attestations = len(signedBlock.message.body.attestations),
graffiti = graffiti
notice "Block published", blck = shortLog(signedBlock.message),
blockRoot = shortLog(blockRoot), validator = shortLog(validator),
validator_index = validator.index.get()
else:
warn "Block was not accepted by beacon node", slot = currentSlot,
validator = validator.pubKey, validator_index = validator.index.get(),
deposits = len(signedBlock.message.body.deposits),
attestations = len(signedBlock.message.body.attestations),
graffiti = graffiti
warn "Block was not accepted by beacon node",
blck = shortLog(signedBlock.message),
blockRoot = shortLog(blockRoot),
validator = shortLog(validator),
validator_index = validator.index.get(),
wall_slot = currentSlot
else:
warn "Slashing protection activated for block proposal",
slot = currentSlot, validator = validator.pubKey,
blck = shortLog(beaconBlock), blockRoot = shortLog(blockRoot),
validator = shortLog(validator),
validator_index = validator.index.get(),
wall_slot = currentSlot,
existingProposal = notSlashable.error
except CatchableError as exc:
error "Unexpected error happens while proposing block",
@ -87,25 +92,27 @@ proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
let (inFuture, timeToSleep) = vc.beaconClock.fromNow(slot)
try:
if inFuture:
debug "Proposing block", timeIn = timeToSleep, validator = proposerKey
debug "Proposing block", timeIn = timeToSleep,
validator = shortLog(proposerKey)
await sleepAsync(timeToSleep)
else:
debug "Proposing block", timeIn = 0.seconds, validator = proposerKey
debug "Proposing block", timeIn = 0.seconds,
validator = shortLog(proposerKey)
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
# We need to check that we still have validator in our pool.
let validator = vc.attachedValidators.getValidator(proposerKey)
if isNil(validator):
debug "Validator is not present in pool anymore, exiting",
validator = proposerKey
let validator =
block:
let res = vc.getValidator(proposerKey)
if res.isNone():
return
res.get()
await vc.publishBlock(currentSlot, slot, validator)
except CancelledError:
debug "Proposing task was cancelled", slot = slot, validator = proposerKey
debug "Proposing task was cancelled", slot = slot,
validator = shortLog(proposerKey)
proc spawnProposalTask(vc: ValidatorClientRef,
duty: RestProposerDuty): ProposerTask =
@ -124,15 +131,36 @@ proc contains(data: openArray[ProposerTask], duty: RestProposerDuty): bool =
return true
false
proc checkDuty(duty: RestProposerDuty, epoch: Epoch, slot: Slot): bool =
let lastSlot = compute_start_slot_at_epoch(epoch + 1'u64)
if duty.slot >= slot:
if duty.slot < lastSlot:
true
else:
warn "Block proposal duty is in the far future, ignoring",
duty_slot = duty.slot, validator = shortLog(duty.pubkey),
wall_slot = slot, last_slot_in_epoch = (lastSlot - 1'u64)
false
else:
warn "Block proposal duty is in the past, ignoring", duty_slot = duty.slot,
validator = shortLog(duty.pubkey), wall_slot = slot
false
proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
dependentRoot: Eth2Digest,
duties: openArray[RestProposerDuty]) =
let epochDuties = vc.proposers.getOrDefault(epoch)
let default = ProposedData(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
epochDuties = vc.proposers.getOrDefault(epoch, default)
if not(epochDuties.isDefault()):
if epochDuties.dependentRoot != dependentRoot:
warn "Proposer duties re-organization",
warn "Proposer duties re-organization", duties_count = len(duties),
wall_slot = currentSlot, epoch = epoch,
prior_dependent_root = epochDuties.dependentRoot,
dependent_root = dependentRoot
dependent_root = dependentRoot, wall_slot = currentSlot
let tasks =
block:
var res: seq[ProposerTask]
@ -142,18 +170,21 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
if task notin duties:
# Task is no more relevant, so cancel it.
debug "Cancelling running proposal duty task",
slot = task.duty.slot, validator = task.duty.pubkey
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
task.future.cancel()
else:
# If task is already running for proper slot, we keep it alive.
debug "Keep running previous proposal duty task",
slot = task.duty.slot, validator = task.duty.pubkey
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
res.add(task)
for duty in duties:
if duty notin res:
debug "New proposal duty received", slot = duty.slot,
validator = duty.pubkey
validator = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
error "Multiple block proposers for this slot, " &
@ -164,6 +195,9 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
else:
debug "New block proposal duties received",
dependent_root = dependentRoot, duties_count = len(duties),
wall_slot = currentSlot, epoch = epoch
# Spawn new proposer tasks and modify proposers map.
let tasks =
block:
@ -171,7 +205,8 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
var res: seq[ProposerTask]
for duty in duties:
debug "New proposal duty received", slot = duty.slot,
validator = duty.pubkey
validator = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
error "Multiple block proposers for this slot, " &

View File

@ -244,3 +244,16 @@ proc getDelay*(vc: ValidatorClientRef, instant: Duration): Duration =
let slotStartTime = currentBeaconTime.slotOrZero().toBeaconTime()
let idealTime = Duration(slotStartTime) + instant
currentTime - idealTime
proc getValidator*(vc: ValidatorClientRef,
key: ValidatorPubkey): Option[AttachedValidator] =
let validator = vc.attachedValidators.getValidator(key)
if isNil(validator):
warn "Validator not in pool anymore", validator = shortLog(validator)
none[AttachedValidator]()
else:
if validator.index.isNone():
warn "Validator index is missing", validator = shortLog(validator)
none[AttachedValidator]()
else:
some(validator)

View File

@ -48,8 +48,12 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
let res =
try:
await vc.getValidators(idents)
except ValidatorApiError as exc:
error "Unable to retrieve head state's validator information"
except ValidatorApiError:
error "Unable to get head state's validator information"
return
except CatchableError as exc:
error "Unexpected error occurred while getting validator information",
err_name = exc.name, err_msg = exc.msg
return
for item in res:
@ -94,8 +98,12 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
let res =
try:
await vc.getAttesterDuties(epoch, indices)
except ValidatorApiError as exc:
error "Unable to retrieve attester duties", epoch = epoch
except ValidatorApiError:
error "Unable to get attester duties", epoch = epoch
return 0
except CatchableError as exc:
error "Unexpected error occured while getting attester duties",
epoch = epoch, err_name = exc.name, err_msg = exc.msg
return 0
if currentRoot.isNone():
@ -264,9 +272,13 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
else:
debug "No relevant proposer duties received", slot = currentSlot,
duties_count = len(duties)
except ValidatorApiError as exc:
debug "Unable to retrieve proposer duties", slot = currentSlot,
except ValidatorApiError:
debug "Unable to get proposer duties", slot = currentSlot,
epoch = currentEpoch
except CatchableError as exc:
debug "Unexpected error occured while getting proposer duties",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
vc.pruneBeaconProposers(currentEpoch)

View File

@ -10,6 +10,10 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} =
except ValidatorApiError as exc:
error "Unable to retrieve head state's fork", reason = exc.msg
return
except CatchableError as exc:
error "Unexpected error occured while getting fork information",
err_name = exc.name, err_msg = exc.msg
return
if vc.fork.isNone() or vc.fork.get() != fork:
vc.fork = some(fork)