From d4bafdf5a4540f19d067006f86f64d61946ad79d Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 15 Jul 2022 00:11:25 +0300 Subject: [PATCH] VC: cancellation hot-fixes. (#3875) * Fix cancellation issues. * Add exitEvent which will allow gracefully shutdown validator client. * Fix firstSuccessTimeout() template. * Fix service names. * Modify waitOnlineNodes to include timeout parameter. --- beacon_chain/nimbus_validator_client.nim | 16 +- beacon_chain/validator_client/api.nim | 56 ++++-- .../validator_client/attestation_service.nim | 80 +++++--- .../validator_client/block_service.nim | 188 ++++++++++-------- beacon_chain/validator_client/common.nim | 7 +- .../validator_client/duties_service.nim | 96 +++++---- .../validator_client/fallback_service.nim | 45 +++-- .../validator_client/fork_service.nim | 13 +- .../sync_committee_service.nim | 10 +- 9 files changed, 320 insertions(+), 191 deletions(-) diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index 11dff3317..83fd30f20 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -5,7 +5,8 @@ # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. import validator_client/[common, fallback_service, duties_service, - attestation_service, fork_service, sync_committee_service] + attestation_service, fork_service, + sync_committee_service] proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} = info "Initializing genesis", nodes_count = len(vc.beaconNodes) @@ -170,9 +171,12 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} = vc.attestationService.start() vc.syncCommitteeService.start() + var exitEventFut = vc.gracefulExit.wait() try: vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart) - await vc.runSlotLoopFut + discard await race(vc.runSlotLoopFut, exitEventFut) + if not(vc.runSlotLoopFut.finished()): + notice "Received shutdown event, exiting" except CancelledError: debug "Main loop interrupted" except CatchableError as exc: @@ -185,10 +189,8 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} = var pending: seq[Future[void]] if not(vc.runSlotLoopFut.finished()): pending.add(vc.runSlotLoopFut.cancelAndWait()) - if not(vc.sigintHandleFut.finished()): - pending.add(vc.sigintHandleFut.cancelAndWait()) - if not(vc.sigtermHandleFut.finished()): - pending.add(vc.sigtermHandleFut.cancelAndWait()) + if not(exitEventFut.finished()): + pending.add(exitEventFut.cancelAndWait()) debug "Stopping running services" pending.add(vc.fallbackService.stop()) pending.add(vc.forkService.stop()) @@ -268,6 +270,7 @@ programMain: graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), nodesAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(), + gracefulExit: newAsyncEvent(), sigintHandleFut: waitSignal(SIGINT), sigtermHandleFut: waitSignal(SIGTERM) ) @@ -278,6 +281,7 @@ programMain: graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), nodesAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(), + gracefulExit: newAsyncEvent(), sigintHandleFut: newFuture[void]("sigint_placeholder"), sigtermHandleFut: newFuture[void]("sigterm_placeholder") ) diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index ac14db872..c5399f69c 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -25,7 +25,24 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc, var iterationsCount = 0 while true: - let onlineNodes = vc.onlineNodes() + let onlineNodes = + try: + await vc.waitOnlineNodes(timerFut) + vc.onlineNodes() + except CancelledError as exc: + # waitOnlineNodes do not cancel `timoutFuture`. + var default: seq[BeaconNodeServerRef] + if not(isNil(timerFut)) and not(timerFut.finished()): + await timerFut.cancelAndWait() + raise exc + except CatchableError: + # This case could not be happened. + var default: seq[BeaconNodeServerRef] + default + + if len(onlineNodes) == 0: + # `onlineNodes` sequence is empty only if operation timeout exceeded. + break if iterationsCount != 0: debug "Request got failed", iterations_count = iterationsCount @@ -44,13 +61,13 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc, # be able to check errors. await allFutures(bodyFut) ApiOperation.Success - except CancelledError: + except CancelledError as exc: # `allFutures()` could not cancel Futures. - await bodyFut.cancelAndWait() - ApiOperation.Interrupt + if not(bodyFut.finished()): + await bodyFut.cancelAndWait() + raise exc except CatchableError as exc: - # This only could happened if `allFutures()` start raise - # exceptions. + # This case could not be happened. ApiOperation.Failure else: try: @@ -60,17 +77,17 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc, else: await bodyFut.cancelAndWait() ApiOperation.Timeout - except CancelledError: + except CancelledError as exc: # `race()` could not cancel Futures. + var pending: seq[Future[void]] if not(bodyFut.finished()): - if not(timerFut.finished()): - timerFut.cancel() - await allFutures(bodyFut.cancelAndWait(), timerFut) - else: - await cancelAndWait(timerFut) - ApiOperation.Interrupt + pending.add(bodyFut.cancelAndWait()) + if not(isNil(timerFut)) and not(timerFut.finished()): + pending.add(timerFut.cancelAndWait()) + await allFutures(pending) + raise exc except CatchableError as exc: - # This only could happened if `race()` start raise exceptions. + # This case should not happen. ApiOperation.Failure block: @@ -112,8 +129,6 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc, if exitNow: break - await vc.waitOnlineNodes() - proc getDutyErrorMessage(response: RestPlainResponse): string = let res = decodeBytes(RestDutyError, response.data, response.contentType) @@ -216,7 +231,8 @@ proc getSyncCommitteeDuties*( validators: seq[ValidatorIndex] ): Future[GetSyncCommitteeDutiesResponse] {.async.} = logScope: request = "getSyncCommitteeDuties" - vc.firstSuccessTimeout(RestResponse[GetSyncCommitteeDutiesResponse], SlotDuration, + vc.firstSuccessTimeout(RestResponse[GetSyncCommitteeDutiesResponse], + SlotDuration, getSyncCommitteeDuties(it, epoch, validators)): if apiResponse.isErr(): debug "Unable to retrieve sync committee duties", endpoint = node, @@ -466,7 +482,8 @@ proc submitPoolSyncCommitteeSignature*( let response = apiResponse.get() case response.status of 200: - debug "Sync committee message was successfully published", endpoint = node + debug "Sync committee message was successfully published", + endpoint = node return true of 400: debug "Received invalid request response", @@ -484,7 +501,8 @@ proc submitPoolSyncCommitteeSignature*( response_error = response.getDutyErrorMessage() RestBeaconNodeStatus.Offline - raise newException(ValidatorApiError, "Unable to submit sync committee message") + raise newException(ValidatorApiError, + "Unable to submit sync committee message") proc getAggregatedAttestation*( vc: ValidatorClientRef, diff --git a/beacon_chain/validator_client/attestation_service.nim b/beacon_chain/validator_client/attestation_service.nim index 3538af488..e29e41533 100644 --- a/beacon_chain/validator_client/attestation_service.nim +++ b/beacon_chain/validator_client/attestation_service.nim @@ -9,7 +9,10 @@ import std/sets import chronicles import "."/[common, api, block_service] -logScope: service = "attestation_service" +const + ServiceName = "attestation_service" + +logScope: service = ServiceName type AggregateItem* = object @@ -48,14 +51,22 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, return false let attestation = block: - let signature = block: - let res = await validator.getAttestationSignature( - fork, vc.beaconGenesis.genesis_validators_root, adata) - if res.isErr(): - error "Unable to sign attestation", validator = shortLog(validator), - error_msg = res.error() + let signature = + try: + let res = await validator.getAttestationSignature( + fork, vc.beaconGenesis.genesis_validators_root, adata) + if res.isErr(): + error "Unable to sign attestation", validator = shortLog(validator), + error_msg = res.error() + return false + res.get() + except CancelledError as exc: + debug "Attestation signature process was interrupted" + raise exc + except CatchableError as exc: + error "An unexpected error occurred while signing attestation", + err_name = exc.name, err_msg = exc.msg return false - res.get() Attestation.init( [duty.data.validator_committee_index], @@ -76,9 +87,9 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, validator = shortLog(validator), validator_index = vindex return false - except CancelledError: - debug "Publish attestation request was interrupted" - return false + except CancelledError as exc: + debug "Attestation publishing process was interrupted" + raise exc except CatchableError as exc: error "Unexpected error occured while publishing attestation", attestation = shortLog(attestation), @@ -110,14 +121,15 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, genesisRoot = vc.beaconGenesis.genesis_validators_root slot = proof.aggregate.data.slot fork = vc.forkAtEpoch(slot.epoch) + vindex = validator.index.get() debug "Signing aggregate", validator = shortLog(validator), attestation = shortLog(proof.aggregate), fork = fork let signature = - block: - let res = await getAggregateAndProofSignature( - validator, fork, genesisRoot, proof) + try: + let res = await validator.getAggregateAndProofSignature( + fork, genesisRoot, proof) if res.isErr(): error "Unable to sign aggregate and proof using remote signer", validator = shortLog(validator), @@ -125,11 +137,20 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, error_msg = res.error() return false res.get() + except CancelledError as exc: + debug "Aggregated attestation signing process was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while signing aggregated attestation", + validator = shortLog(validator), + attestation = shortLog(proof.aggregate), + validator_index = vindex, + err_name = exc.name, err_msg = exc.msg + return false + let signedProof = SignedAggregateAndProof(message: proof, signature: signature) - let vindex = validator.index.get() - debug "Sending aggregated attestation", fork = fork, attestation = shortLog(signedProof.message.aggregate), validator = shortLog(validator), validator_index = vindex, @@ -144,9 +165,9 @@ proc serveAggregateAndProof*(service: AttestationServiceRef, validator = shortLog(validator), validator_index = vindex return false - except CancelledError: + except CancelledError as exc: debug "Publish aggregate and proofs request was interrupted" - return false + raise exc except CatchableError as exc: error "Unexpected error occured while publishing aggregated attestation", attestation = shortLog(signedProof.message.aggregate), @@ -199,11 +220,12 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef, var errored, succeed, failed = 0 try: await allFutures(pendingAttestations) - except CancelledError: + except CancelledError as exc: for fut in pendingAttestations: if not(fut.finished()): fut.cancel() await allFutures(pendingAttestations) + raise exc for future in pendingAttestations: if future.done(): @@ -263,9 +285,9 @@ proc produceAndPublishAggregates(service: AttestationServiceRef, error "Unable to get aggregated attestation data", slot = slot, attestation_root = shortLog(attestationRoot) return - except CancelledError: + except CancelledError as exc: debug "Aggregated attestation request was interrupted" - return + raise exc except CatchableError as exc: error "Unexpected error occured while getting aggregated attestation", slot = slot, attestation_root = shortLog(attestationRoot), @@ -289,11 +311,12 @@ proc produceAndPublishAggregates(service: AttestationServiceRef, var errored, succeed, failed = 0 try: await allFutures(pendingAggregates) - except CancelledError: + except CancelledError as exc: for fut in pendingAggregates: if not(fut.finished()): fut.cancel() await allFutures(pendingAggregates) + raise exc for future in pendingAggregates: if future.done(): @@ -327,9 +350,9 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef, await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds)) let dur = Moment.now() - startTime debug "Block proposal awaited", slot = slot, duration = dur - except CancelledError: + except CancelledError as exc: debug "Block proposal waiting was interrupted" - return + raise exc except AsyncTimeoutError: let dur = Moment.now() - startTime debug "Block was not produced in time", slot = slot, duration = dur @@ -346,9 +369,9 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef, error "Unable to proceed attestations", slot = slot, committee_index = committee_index, duties_count = len(duties) return - except CancelledError: + except CancelledError as exc: debug "Publish attestation request was interrupted" - return + raise exc except CatchableError as exc: error "Unexpected error while producing attestations", slot = slot, committee_index = committee_index, duties_count = len(duties), @@ -414,9 +437,10 @@ proc mainLoop(service: AttestationServiceRef) {.async.} = proc init*(t: typedesc[AttestationServiceRef], vc: ValidatorClientRef): Future[AttestationServiceRef] {.async.} = - debug "Initializing service" - let res = AttestationServiceRef(name: "attestation_service", + logScope: service = ServiceName + let res = AttestationServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) + debug "Initializing service" return res proc start*(service: AttestationServiceRef) = diff --git a/beacon_chain/validator_client/block_service.nim b/beacon_chain/validator_client/block_service.nim index f08de13de..026704509 100644 --- a/beacon_chain/validator_client/block_service.nim +++ b/beacon_chain/validator_client/block_service.nim @@ -21,92 +21,107 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, genesis_root = genesisRoot, graffiti = graffiti, fork = fork, slot = slot, wall_slot = currentSlot - try: - let randaoReveal = - block: - let res = await validator.getEpochSignature( - fork, genesisRoot, slot.epoch) + let randaoReveal = + try: + let res = await validator.getEpochSignature(fork, genesisRoot, slot.epoch) + if res.isErr(): + error "Unable to generate randao reveal usint remote signer", + validator = shortLog(validator), error_msg = res.error() + return + res.get() + except CancelledError as exc: + error "Randao reveal processing was interrupted" + raise exc + except CatchableError as exc: + error "An unexpected error occurred while receiving randao data", + err_name = exc.name, err_msg = exc.msg + return + + let beaconBlock = + try: + await vc.produceBlockV2(slot, randaoReveal, graffiti) + except ValidatorApiError: + error "Unable to retrieve block data", slot = slot, + wall_slot = currentSlot, validator = shortLog(validator) + return + except CancelledError as exc: + error "Producing block processing was interrupted" + raise exc + except CatchableError as exc: + error "An unexpected error occurred while getting block data", + err_name = exc.name, err_msg = exc.msg + return + + let blockRoot = withBlck(beaconBlock): hash_tree_root(blck) + # TODO: signing_root is recomputed in getBlockSignature just after + let signing_root = compute_block_signing_root(fork, genesisRoot, slot, + blockRoot) + let notSlashable = vc.attachedValidators + .slashingProtection + .registerBlock(ValidatorIndex(beaconBlock.proposer_index), + validator.pubkey, slot, signing_root) + + if notSlashable.isOk(): + let signature = + try: + let res = await validator.getBlockSignature(fork, genesisRoot, + slot, blockRoot, + beaconBlock) if res.isErr(): - error "Unable to generate randao reveal usint remote signer", + error "Unable to sign block proposal using remote signer", validator = shortLog(validator), error_msg = res.error() return res.get() - - let beaconBlock = - try: - await vc.produceBlockV2(slot, randaoReveal, graffiti) - except ValidatorApiError: - error "Unable to retrieve block data", slot = slot, - wall_slot = currentSlot, validator = shortLog(validator) - return + except CancelledError as exc: + debug "Block signature processing was interrupted" + raise exc except CatchableError as exc: - error "An unexpected error occurred while getting block data", + error "An unexpected error occurred while signing block", + err_name = exc.name, err_msg = exc.msg + return + + debug "Sending block", + blockRoot = shortLog(blockRoot), blck = shortLog(beaconBlock), + signature = shortLog(signature), validator = shortLog(validator) + + let res = + try: + let signedBlock = ForkedSignedBeaconBlock.init(beaconBlock, blockRoot, + signature) + await vc.publishBlock(signedBlock) + except ValidatorApiError: + error "Unable to publish block", + blockRoot = shortLog(blockRoot), + blck = shortLog(beaconBlock), + signature = shortLog(signature), + validator = shortLog(validator), + validator_index = validator.index.get(), + wall_slot = currentSlot + return + except CancelledError as exc: + debug "Publishing block processing was interrupted" + raise exc + except CatchableError as exc: + error "An unexpected error occurred while publishing block", err_name = exc.name, err_msg = exc.msg return - - let blockRoot = withBlck(beaconBlock): hash_tree_root(blck) - # TODO: signing_root is recomputed in getBlockSignature just after - let signing_root = compute_block_signing_root(fork, genesisRoot, slot, - blockRoot) - let notSlashable = vc.attachedValidators - .slashingProtection - .registerBlock(ValidatorIndex(beaconBlock.proposer_index), - validator.pubkey, slot, signing_root) - - if notSlashable.isOk(): - let signature = - block: - let res = await validator.getBlockSignature(fork, genesisRoot, - slot, blockRoot, - beaconBlock) - if res.isErr(): - error "Unable to sign block proposal using remote signer", - validator = shortLog(validator), error_msg = res.error() - return - res.get() - - debug "Sending block", - blockRoot = shortLog(blockRoot), blck = shortLog(beaconBlock), - signature = shortLog(signature), validator = shortLog(validator) - - let res = - try: - let signedBlock = ForkedSignedBeaconBlock.init(beaconBlock, blockRoot, - signature) - await vc.publishBlock(signedBlock) - except ValidatorApiError: - error "Unable to publish block", - blockRoot = shortLog(blockRoot), - blck = shortLog(beaconBlock), - signature = shortLog(signature), - validator = shortLog(validator), - validator_index = validator.index.get(), - wall_slot = currentSlot - return - except CatchableError as exc: - error "An unexpected error occurred while publishing block", - err_name = exc.name, err_msg = exc.msg - return - if res: - notice "Block published", blockRoot = shortLog(blockRoot), - blck = shortLog(beaconBlock), signature = shortLog(signature), - validator = shortLog(validator) - else: - warn "Block was not accepted by beacon node", - blockRoot = shortLog(blockRoot), - blck = shortLog(beaconBlock), - signature = shortLog(signature), - validator = shortLog(validator), - wall_slot = currentSlot + if res: + notice "Block published", blockRoot = shortLog(blockRoot), + blck = shortLog(beaconBlock), signature = shortLog(signature), + validator = shortLog(validator) else: - warn "Slashing protection activated for block proposal", - blockRoot = shortLog(blockRoot), blck = shortLog(beaconBlock), + warn "Block was not accepted by beacon node", + blockRoot = shortLog(blockRoot), + blck = shortLog(beaconBlock), + signature = shortLog(signature), validator = shortLog(validator), - wall_slot = currentSlot, - existingProposal = notSlashable.error - except CatchableError as exc: - error "Unexpected error happens while proposing block", - error_name = exc.name, error_msg = exc.msg + wall_slot = currentSlot + else: + warn "Slashing protection activated for block proposal", + blockRoot = shortLog(blockRoot), blck = shortLog(beaconBlock), + validator = shortLog(validator), + wall_slot = currentSlot, + existingProposal = notSlashable.error proc proposeBlock(vc: ValidatorClientRef, slot: Slot, proposerKey: ValidatorPubKey) {.async.} = @@ -130,10 +145,10 @@ proc proposeBlock(vc: ValidatorClientRef, slot: Slot, return res.get() await vc.publishBlock(currentSlot, slot, validator) - - except CancelledError: - debug "Proposing task was cancelled", slot = slot, - validator = shortLog(proposerKey) + except CancelledError as exc: + debug "Block proposing was interrupted", slot = slot, + validator = shortLog(proposerKey) + raise exc proc spawnProposalTask(vc: ValidatorClientRef, duty: RestProposerDuty): ProposerTask = @@ -251,4 +266,13 @@ proc waitForBlockPublished*(vc: ValidatorClientRef, slot: Slot) {.async.} = res.add(task.future) res if len(pendingTasks) > 0: - await allFutures(pendingTasks) + try: + await allFutures(pendingTasks) + except CancelledError as exc: + var pending: seq[Future[void]] + for future in pendingTasks: + if not(future.finished()): + pending.add(future.cancelAndWait()) + await allFutures(pending) + raise exc + diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 169058ed4..b41b63894 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -120,7 +120,6 @@ type config*: ValidatorClientConf graffitiBytes*: GraffitiBytes beaconNodes*: seq[BeaconNodeServerRef] - nodesAvailable*: AsyncEvent fallbackService*: FallbackServiceRef forkService*: ForkServiceRef dutiesService*: DutiesServiceRef @@ -134,6 +133,8 @@ type attachedValidators*: ValidatorPool forks*: seq[Fork] forksAvailable*: AsyncEvent + nodesAvailable*: AsyncEvent + gracefulExit*: AsyncEvent attesters*: AttesterMap proposers*: ProposerMap syncCommitteeDuties*: SyncCommitteeDutiesMap @@ -173,13 +174,13 @@ chronicles.expandIt(RestAttesterDuty): validator_committee_index = it.validator_committee_index proc stop*(csr: ClientServiceRef) {.async.} = - debug "Stopping service", service_name = csr.name + debug "Stopping service", service = csr.name if csr.state == ServiceState.Running: csr.state = ServiceState.Closing if not(csr.lifeFut.finished()): await csr.lifeFut.cancelAndWait() csr.state = ServiceState.Closed - debug "Service stopped", service_name = csr.name + debug "Service stopped", service = csr.name proc isDefault*(dap: DutyAndProof): bool = dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64) diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index 6921a64a1..3dc89dc95 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -2,7 +2,10 @@ import std/[sets, sequtils] import chronicles import common, api, block_service -logScope: service = "duties_service" +const + ServiceName = "duties_service" + +logScope: service = ServiceName type DutiesServiceLoop* = enum @@ -54,9 +57,9 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = except ValidatorApiError: error "Unable to get head state's validator information" return - except CancelledError: - debug "Validator's indices request was interrupted" - return + except CancelledError as exc: + debug "Validator's indices processing was interrupted" + raise exc except CatchableError as exc: error "Unexpected error occurred while getting validator information", err_name = exc.name, err_msg = exc.msg @@ -110,9 +113,9 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef, except ValidatorApiError: error "Unable to get attester duties", epoch = epoch return 0 - except CancelledError: - debug "Attester duties request was interrupted" - return 0 + except CancelledError as exc: + debug "Attester duties processing was interrupted" + raise exc except CatchableError as exc: error "Unexpected error occured while getting attester duties", epoch = epoch, err_name = exc.name, err_msg = exc.msg @@ -164,19 +167,27 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef, res if len(addOrReplaceItems) > 0: - var pending: seq[Future[SignatureResult]] + var pendingRequests: seq[Future[SignatureResult]] var validators: seq[AttachedValidator] for item in addOrReplaceItems: let validator = vc.attachedValidators.getValidator(item.duty.pubkey) let fork = vc.forkAtEpoch(item.duty.slot.epoch) let future = validator.getSlotSignature( fork, genesisRoot, item.duty.slot) - pending.add(future) + pendingRequests.add(future) validators.add(validator) - await allFutures(pending) + try: + await allFutures(pendingRequests) + except CancelledError as exc: + var pendingCancel: seq[Future[void]] + for future in pendingRequests: + if not(future.finished()): + pendingCancel.add(future.cancelAndWait()) + await allFutures(pendingCancel) + raise exc - for index, fut in pending: + for index, fut in pendingRequests: let item = addOrReplaceItems[index] let dap = if fut.done(): @@ -219,9 +230,9 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, except ValidatorApiError: error "Unable to get sync committee duties", epoch = epoch return 0 - except CancelledError: - debug "Request for sync committee duties was interrupted" - return 0 + except CancelledError as exc: + debug "Sync committee duties processing was interrupted" + raise exc except CatchableError as exc: error "Unexpected error occurred while getting sync committee duties", epoch = epoch, err_name = exc.name, err_msg = exc.msg @@ -261,7 +272,7 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, res if len(addOrReplaceItems) > 0: - var pending: seq[Future[SignatureResult]] + var pendingRequests: seq[Future[SignatureResult]] var validators: seq[AttachedValidator] let sres = vc.getCurrentSlot() if sres.isSome(): @@ -272,12 +283,20 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, genesisRoot, sres.get(), getSubcommitteeIndex(item.duty.validator_sync_committee_index)) - pending.add(future) + pendingRequests.add(future) validators.add(validator) - await allFutures(pending) + try: + await allFutures(pendingRequests) + except CancelledError as exc: + var pendingCancel: seq[Future[void]] + for future in pendingRequests: + if not(future.finished()): + pendingCancel.add(future.cancelAndWait()) + await allFutures(pendingCancel) + raise exc - for index, fut in pending: + for index, fut in pendingRequests: let item = addOrReplaceItems[index] let dap = if fut.done(): @@ -442,8 +461,9 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} = except ValidatorApiError: debug "Unable to get proposer duties", slot = currentSlot, epoch = currentEpoch - except CancelledError: - debug "Proposer duties request was interrupted" + except CancelledError as exc: + debug "Proposer duties processing was interrupted" + raise exc except CatchableError as exc: debug "Unexpected error occured while getting proposer duties", slot = currentSlot, epoch = currentEpoch, err_name = exc.name, @@ -512,10 +532,10 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = debug "Service started" var - fut1 = service.attesterDutiesLoop() - fut2 = service.proposerDutiesLoop() - fut3 = service.validatorIndexLoop() - fut4 = service.syncCommitteeeDutiesLoop() + attestFut = service.attesterDutiesLoop() + proposeFut = service.proposerDutiesLoop() + indicesFut = service.validatorIndexLoop() + syncFut = service.syncCommitteeeDutiesLoop() while true: # This loop could look much more nicer/better, when @@ -523,20 +543,25 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - discard await race(fut1, fut2, fut3, fut4) - checkAndRestart(AttesterLoop, fut1, service.attesterDutiesLoop()) - checkAndRestart(ProposerLoop, fut2, service.proposerDutiesLoop()) - checkAndRestart(IndicesLoop, fut3, service.validatorIndexLoop()) + discard await race(attestFut, proposeFut, indicesFut, syncFut) + checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop()) + checkAndRestart(ProposerLoop, proposeFut, service.proposerDutiesLoop()) + checkAndRestart(IndicesLoop, indicesFut, service.validatorIndexLoop()) checkAndRestart(SyncCommitteeLoop, - fut4, service.syncCommitteeeDutiesLoop()) + syncFut, service.syncCommitteeeDutiesLoop()) false except CancelledError: - if not(fut1.finished()): fut1.cancel() - if not(fut2.finished()): fut2.cancel() - if not(fut3.finished()): fut3.cancel() - if not(fut4.finished()): fut4.cancel() - await allFutures(fut1, fut2, fut3, fut4) debug "Service interrupted" + var pending: seq[Future[void]] + if not(attestFut.finished()): + pending.add(attestFut.cancelAndWait()) + if not(proposeFut.finished()): + pending.add(proposeFut.cancelAndWait()) + if not(indicesFut.finished()): + pending.add(indicesFut.cancelAndWait()) + if not(syncFut.finished()): + pending.add(syncFut.cancelAndWait()) + await allFutures(pending) true except CatchableError as exc: warn "Service crashed with unexpected error", err_name = exc.name, @@ -548,7 +573,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = proc init*(t: typedesc[DutiesServiceRef], vc: ValidatorClientRef): Future[DutiesServiceRef] {.async.} = - let res = DutiesServiceRef(name: "duties_service", + logScope: service = ServiceName + let res = DutiesServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) debug "Initializing service" # We query for indices first, to avoid empty queries for duties. diff --git a/beacon_chain/validator_client/fallback_service.nim b/beacon_chain/validator_client/fallback_service.nim index f7f5e0eef..60eec6fe9 100644 --- a/beacon_chain/validator_client/fallback_service.nim +++ b/beacon_chain/validator_client/fallback_service.nim @@ -1,6 +1,9 @@ import common -logScope: service = "fallback_service" +const + ServiceName = "fallback_service" + +logScope: service = ServiceName type BeaconNodesCounters* = object @@ -38,7 +41,8 @@ proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters = inc(res.online) res -proc waitOnlineNodes*(vc: ValidatorClientRef) {.async.} = +proc waitOnlineNodes*(vc: ValidatorClientRef, + timeoutFut: Future[void] = nil) {.async.} = doAssert(not(isNil(vc.fallbackService))) while true: if vc.onlineNodesCount() != 0: @@ -50,7 +54,26 @@ proc waitOnlineNodes*(vc: ValidatorClientRef) {.async.} = online_nodes = vc.onlineNodesCount(), unusable_nodes = vc.unusableNodesCount(), total_nodes = len(vc.beaconNodes) - await vc.fallbackService.onlineEvent.wait() + if isNil(timeoutFut): + await vc.fallbackService.onlineEvent.wait() + else: + let breakLoop = + block: + let waitFut = vc.fallbackService.onlineEvent.wait() + try: + discard await race(waitFut, timeoutFut) + except CancelledError as exc: + if not(waitFut.finished()): + await waitFut.cancelAndWait() + raise exc + + if not(waitFut.finished()): + await waitFut.cancelAndWait() + true + else: + false + if breakLoop: + break proc checkCompatible(vc: ValidatorClientRef, node: BeaconNodeServerRef) {.async.} = @@ -207,13 +230,10 @@ proc checkNodes*(service: FallbackServiceRef) {.async.} = try: await allFutures(pendingChecks) except CancelledError as exc: - let pending = - block: - var res: seq[Future[void]] - for fut in pendingChecks: - if not(fut.finished()): - res.add(fut.cancelAndWait()) - res + var pending: seq[Future[void]] + for future in pendingChecks: + if not(future.finished()): + pending.add(future.cancelAndWait()) await allFutures(pending) raise exc @@ -255,10 +275,11 @@ proc mainLoop(service: FallbackServiceRef) {.async.} = proc init*(t: typedesc[FallbackServiceRef], vc: ValidatorClientRef): Future[FallbackServiceRef] {.async.} = - debug "Initializing service" - var res = FallbackServiceRef(name: "fallback_service", client: vc, + logScope: service = ServiceName + var res = FallbackServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized, onlineEvent: newAsyncEvent()) + debug "Initializing service" # Perform initial nodes check. await res.checkNodes() return res diff --git a/beacon_chain/validator_client/fork_service.nim b/beacon_chain/validator_client/fork_service.nim index 4a922cb4f..6b4f029b9 100644 --- a/beacon_chain/validator_client/fork_service.nim +++ b/beacon_chain/validator_client/fork_service.nim @@ -2,7 +2,10 @@ import std/algorithm import chronicles import common, api -logScope: service = "fork_service" +const + ServiceName = "fork_service" + +logScope: service = ServiceName proc validateForkSchedule(forks: openArray[Fork]): bool {.raises: [Defect].} = # Check if `forks` list is linked list. @@ -45,6 +48,9 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} = except ValidatorApiError as exc: error "Unable to retrieve fork schedule", reason = exc.msg return + except CancelledError as exc: + debug "Fork retrieval process was interrupted" + raise exc except CatchableError as exc: error "Unexpected error occured while getting fork information", err_name = exc.name, err_msg = exc.msg @@ -96,9 +102,10 @@ proc mainLoop(service: ForkServiceRef) {.async.} = proc init*(t: typedesc[ForkServiceRef], vc: ValidatorClientRef): Future[ForkServiceRef] {.async.} = - debug "Initializing service" - let res = ForkServiceRef(name: "fork_service", + logScope: service = ServiceName + let res = ForkServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) + debug "Initializing service" await vc.pollForFork() return res diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index 2aaa7d018..b81648613 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -12,7 +12,10 @@ import ../spec/datatypes/[phase0, altair, bellatrix], ../spec/eth2_apis/rest_types -logScope: service = "sync_committee_service" +const + ServiceName = "sync_committee_service" + +logScope: service = ServiceName type ContributionItem* = object @@ -399,9 +402,10 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} = proc init*(t: typedesc[SyncCommitteeServiceRef], vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} = - debug "Initializing service" - let res = SyncCommitteeServiceRef(name: "sync_committee_service", + logScope: service = ServiceName + let res = SyncCommitteeServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) + debug "Initializing service" return res proc start*(service: SyncCommitteeServiceRef) =