VC: cancellation hot-fixes. (#3875)

* Fix cancellation issues.
* Add exitEvent which will allow gracefully shutdown validator client.
* Fix firstSuccessTimeout() template.
* Fix service names.
* Modify waitOnlineNodes to include timeout parameter.
This commit is contained in:
Eugene Kabanov 2022-07-15 00:11:25 +03:00 committed by GitHub
parent a517e8718c
commit d4bafdf5a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 320 additions and 191 deletions

View File

@ -5,7 +5,8 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import validator_client/[common, fallback_service, duties_service, import validator_client/[common, fallback_service, duties_service,
attestation_service, fork_service, sync_committee_service] attestation_service, fork_service,
sync_committee_service]
proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} = proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes) info "Initializing genesis", nodes_count = len(vc.beaconNodes)
@ -170,9 +171,12 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} =
vc.attestationService.start() vc.attestationService.start()
vc.syncCommitteeService.start() vc.syncCommitteeService.start()
var exitEventFut = vc.gracefulExit.wait()
try: try:
vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart) vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)
await vc.runSlotLoopFut discard await race(vc.runSlotLoopFut, exitEventFut)
if not(vc.runSlotLoopFut.finished()):
notice "Received shutdown event, exiting"
except CancelledError: except CancelledError:
debug "Main loop interrupted" debug "Main loop interrupted"
except CatchableError as exc: except CatchableError as exc:
@ -185,10 +189,8 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} =
var pending: seq[Future[void]] var pending: seq[Future[void]]
if not(vc.runSlotLoopFut.finished()): if not(vc.runSlotLoopFut.finished()):
pending.add(vc.runSlotLoopFut.cancelAndWait()) pending.add(vc.runSlotLoopFut.cancelAndWait())
if not(vc.sigintHandleFut.finished()): if not(exitEventFut.finished()):
pending.add(vc.sigintHandleFut.cancelAndWait()) pending.add(exitEventFut.cancelAndWait())
if not(vc.sigtermHandleFut.finished()):
pending.add(vc.sigtermHandleFut.cancelAndWait())
debug "Stopping running services" debug "Stopping running services"
pending.add(vc.fallbackService.stop()) pending.add(vc.fallbackService.stop())
pending.add(vc.forkService.stop()) pending.add(vc.forkService.stop())
@ -268,6 +270,7 @@ programMain:
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
nodesAvailable: newAsyncEvent(), nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(),
gracefulExit: newAsyncEvent(),
sigintHandleFut: waitSignal(SIGINT), sigintHandleFut: waitSignal(SIGINT),
sigtermHandleFut: waitSignal(SIGTERM) sigtermHandleFut: waitSignal(SIGTERM)
) )
@ -278,6 +281,7 @@ programMain:
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
nodesAvailable: newAsyncEvent(), nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(),
gracefulExit: newAsyncEvent(),
sigintHandleFut: newFuture[void]("sigint_placeholder"), sigintHandleFut: newFuture[void]("sigint_placeholder"),
sigtermHandleFut: newFuture[void]("sigterm_placeholder") sigtermHandleFut: newFuture[void]("sigterm_placeholder")
) )

View File

@ -25,7 +25,24 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
var iterationsCount = 0 var iterationsCount = 0
while true: while true:
let onlineNodes = vc.onlineNodes() let onlineNodes =
try:
await vc.waitOnlineNodes(timerFut)
vc.onlineNodes()
except CancelledError as exc:
# waitOnlineNodes do not cancel `timoutFuture`.
var default: seq[BeaconNodeServerRef]
if not(isNil(timerFut)) and not(timerFut.finished()):
await timerFut.cancelAndWait()
raise exc
except CatchableError:
# This case could not be happened.
var default: seq[BeaconNodeServerRef]
default
if len(onlineNodes) == 0:
# `onlineNodes` sequence is empty only if operation timeout exceeded.
break
if iterationsCount != 0: if iterationsCount != 0:
debug "Request got failed", iterations_count = iterationsCount debug "Request got failed", iterations_count = iterationsCount
@ -44,13 +61,13 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
# be able to check errors. # be able to check errors.
await allFutures(bodyFut) await allFutures(bodyFut)
ApiOperation.Success ApiOperation.Success
except CancelledError: except CancelledError as exc:
# `allFutures()` could not cancel Futures. # `allFutures()` could not cancel Futures.
if not(bodyFut.finished()):
await bodyFut.cancelAndWait() await bodyFut.cancelAndWait()
ApiOperation.Interrupt raise exc
except CatchableError as exc: except CatchableError as exc:
# This only could happened if `allFutures()` start raise # This case could not be happened.
# exceptions.
ApiOperation.Failure ApiOperation.Failure
else: else:
try: try:
@ -60,17 +77,17 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
else: else:
await bodyFut.cancelAndWait() await bodyFut.cancelAndWait()
ApiOperation.Timeout ApiOperation.Timeout
except CancelledError: except CancelledError as exc:
# `race()` could not cancel Futures. # `race()` could not cancel Futures.
var pending: seq[Future[void]]
if not(bodyFut.finished()): if not(bodyFut.finished()):
if not(timerFut.finished()): pending.add(bodyFut.cancelAndWait())
timerFut.cancel() if not(isNil(timerFut)) and not(timerFut.finished()):
await allFutures(bodyFut.cancelAndWait(), timerFut) pending.add(timerFut.cancelAndWait())
else: await allFutures(pending)
await cancelAndWait(timerFut) raise exc
ApiOperation.Interrupt
except CatchableError as exc: except CatchableError as exc:
# This only could happened if `race()` start raise exceptions. # This case should not happen.
ApiOperation.Failure ApiOperation.Failure
block: block:
@ -112,8 +129,6 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
if exitNow: if exitNow:
break break
await vc.waitOnlineNodes()
proc getDutyErrorMessage(response: RestPlainResponse): string = proc getDutyErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestDutyError, response.data, let res = decodeBytes(RestDutyError, response.data,
response.contentType) response.contentType)
@ -216,7 +231,8 @@ proc getSyncCommitteeDuties*(
validators: seq[ValidatorIndex] validators: seq[ValidatorIndex]
): Future[GetSyncCommitteeDutiesResponse] {.async.} = ): Future[GetSyncCommitteeDutiesResponse] {.async.} =
logScope: request = "getSyncCommitteeDuties" logScope: request = "getSyncCommitteeDuties"
vc.firstSuccessTimeout(RestResponse[GetSyncCommitteeDutiesResponse], SlotDuration, vc.firstSuccessTimeout(RestResponse[GetSyncCommitteeDutiesResponse],
SlotDuration,
getSyncCommitteeDuties(it, epoch, validators)): getSyncCommitteeDuties(it, epoch, validators)):
if apiResponse.isErr(): if apiResponse.isErr():
debug "Unable to retrieve sync committee duties", endpoint = node, debug "Unable to retrieve sync committee duties", endpoint = node,
@ -466,7 +482,8 @@ proc submitPoolSyncCommitteeSignature*(
let response = apiResponse.get() let response = apiResponse.get()
case response.status case response.status
of 200: of 200:
debug "Sync committee message was successfully published", endpoint = node debug "Sync committee message was successfully published",
endpoint = node
return true return true
of 400: of 400:
debug "Received invalid request response", debug "Received invalid request response",
@ -484,7 +501,8 @@ proc submitPoolSyncCommitteeSignature*(
response_error = response.getDutyErrorMessage() response_error = response.getDutyErrorMessage()
RestBeaconNodeStatus.Offline RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to submit sync committee message") raise newException(ValidatorApiError,
"Unable to submit sync committee message")
proc getAggregatedAttestation*( proc getAggregatedAttestation*(
vc: ValidatorClientRef, vc: ValidatorClientRef,

View File

@ -9,7 +9,10 @@ import std/sets
import chronicles import chronicles
import "."/[common, api, block_service] import "."/[common, api, block_service]
logScope: service = "attestation_service" const
ServiceName = "attestation_service"
logScope: service = ServiceName
type type
AggregateItem* = object AggregateItem* = object
@ -48,7 +51,8 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
return false return false
let attestation = block: let attestation = block:
let signature = block: let signature =
try:
let res = await validator.getAttestationSignature( let res = await validator.getAttestationSignature(
fork, vc.beaconGenesis.genesis_validators_root, adata) fork, vc.beaconGenesis.genesis_validators_root, adata)
if res.isErr(): if res.isErr():
@ -56,6 +60,13 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
error_msg = res.error() error_msg = res.error()
return false return false
res.get() res.get()
except CancelledError as exc:
debug "Attestation signature process was interrupted"
raise exc
except CatchableError as exc:
error "An unexpected error occurred while signing attestation",
err_name = exc.name, err_msg = exc.msg
return false
Attestation.init( Attestation.init(
[duty.data.validator_committee_index], [duty.data.validator_committee_index],
@ -76,9 +87,9 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
validator = shortLog(validator), validator = shortLog(validator),
validator_index = vindex validator_index = vindex
return false return false
except CancelledError: except CancelledError as exc:
debug "Publish attestation request was interrupted" debug "Attestation publishing process was interrupted"
return false raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occured while publishing attestation", error "Unexpected error occured while publishing attestation",
attestation = shortLog(attestation), attestation = shortLog(attestation),
@ -110,14 +121,15 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
genesisRoot = vc.beaconGenesis.genesis_validators_root genesisRoot = vc.beaconGenesis.genesis_validators_root
slot = proof.aggregate.data.slot slot = proof.aggregate.data.slot
fork = vc.forkAtEpoch(slot.epoch) fork = vc.forkAtEpoch(slot.epoch)
vindex = validator.index.get()
debug "Signing aggregate", validator = shortLog(validator), debug "Signing aggregate", validator = shortLog(validator),
attestation = shortLog(proof.aggregate), fork = fork attestation = shortLog(proof.aggregate), fork = fork
let signature = let signature =
block: try:
let res = await getAggregateAndProofSignature( let res = await validator.getAggregateAndProofSignature(
validator, fork, genesisRoot, proof) fork, genesisRoot, proof)
if res.isErr(): if res.isErr():
error "Unable to sign aggregate and proof using remote signer", error "Unable to sign aggregate and proof using remote signer",
validator = shortLog(validator), validator = shortLog(validator),
@ -125,11 +137,20 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
error_msg = res.error() error_msg = res.error()
return false return false
res.get() res.get()
except CancelledError as exc:
debug "Aggregated attestation signing process was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while signing aggregated attestation",
validator = shortLog(validator),
attestation = shortLog(proof.aggregate),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
let signedProof = SignedAggregateAndProof(message: proof, let signedProof = SignedAggregateAndProof(message: proof,
signature: signature) signature: signature)
let vindex = validator.index.get()
debug "Sending aggregated attestation", fork = fork, debug "Sending aggregated attestation", fork = fork,
attestation = shortLog(signedProof.message.aggregate), attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator), validator_index = vindex, validator = shortLog(validator), validator_index = vindex,
@ -144,9 +165,9 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
validator = shortLog(validator), validator = shortLog(validator),
validator_index = vindex validator_index = vindex
return false return false
except CancelledError: except CancelledError as exc:
debug "Publish aggregate and proofs request was interrupted" debug "Publish aggregate and proofs request was interrupted"
return false raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occured while publishing aggregated attestation", error "Unexpected error occured while publishing aggregated attestation",
attestation = shortLog(signedProof.message.aggregate), attestation = shortLog(signedProof.message.aggregate),
@ -199,11 +220,12 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
var errored, succeed, failed = 0 var errored, succeed, failed = 0
try: try:
await allFutures(pendingAttestations) await allFutures(pendingAttestations)
except CancelledError: except CancelledError as exc:
for fut in pendingAttestations: for fut in pendingAttestations:
if not(fut.finished()): if not(fut.finished()):
fut.cancel() fut.cancel()
await allFutures(pendingAttestations) await allFutures(pendingAttestations)
raise exc
for future in pendingAttestations: for future in pendingAttestations:
if future.done(): if future.done():
@ -263,9 +285,9 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
error "Unable to get aggregated attestation data", slot = slot, error "Unable to get aggregated attestation data", slot = slot,
attestation_root = shortLog(attestationRoot) attestation_root = shortLog(attestationRoot)
return return
except CancelledError: except CancelledError as exc:
debug "Aggregated attestation request was interrupted" debug "Aggregated attestation request was interrupted"
return raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occured while getting aggregated attestation", error "Unexpected error occured while getting aggregated attestation",
slot = slot, attestation_root = shortLog(attestationRoot), slot = slot, attestation_root = shortLog(attestationRoot),
@ -289,11 +311,12 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
var errored, succeed, failed = 0 var errored, succeed, failed = 0
try: try:
await allFutures(pendingAggregates) await allFutures(pendingAggregates)
except CancelledError: except CancelledError as exc:
for fut in pendingAggregates: for fut in pendingAggregates:
if not(fut.finished()): if not(fut.finished()):
fut.cancel() fut.cancel()
await allFutures(pendingAggregates) await allFutures(pendingAggregates)
raise exc
for future in pendingAggregates: for future in pendingAggregates:
if future.done(): if future.done():
@ -327,9 +350,9 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds)) await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds))
let dur = Moment.now() - startTime let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur debug "Block proposal awaited", slot = slot, duration = dur
except CancelledError: except CancelledError as exc:
debug "Block proposal waiting was interrupted" debug "Block proposal waiting was interrupted"
return raise exc
except AsyncTimeoutError: except AsyncTimeoutError:
let dur = Moment.now() - startTime let dur = Moment.now() - startTime
debug "Block was not produced in time", slot = slot, duration = dur debug "Block was not produced in time", slot = slot, duration = dur
@ -346,9 +369,9 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
error "Unable to proceed attestations", slot = slot, error "Unable to proceed attestations", slot = slot,
committee_index = committee_index, duties_count = len(duties) committee_index = committee_index, duties_count = len(duties)
return return
except CancelledError: except CancelledError as exc:
debug "Publish attestation request was interrupted" debug "Publish attestation request was interrupted"
return raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error while producing attestations", slot = slot, error "Unexpected error while producing attestations", slot = slot,
committee_index = committee_index, duties_count = len(duties), committee_index = committee_index, duties_count = len(duties),
@ -414,9 +437,10 @@ proc mainLoop(service: AttestationServiceRef) {.async.} =
proc init*(t: typedesc[AttestationServiceRef], proc init*(t: typedesc[AttestationServiceRef],
vc: ValidatorClientRef): Future[AttestationServiceRef] {.async.} = vc: ValidatorClientRef): Future[AttestationServiceRef] {.async.} =
debug "Initializing service" logScope: service = ServiceName
let res = AttestationServiceRef(name: "attestation_service", let res = AttestationServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized) client: vc, state: ServiceState.Initialized)
debug "Initializing service"
return res return res
proc start*(service: AttestationServiceRef) = proc start*(service: AttestationServiceRef) =

View File

@ -21,16 +21,21 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
genesis_root = genesisRoot, genesis_root = genesisRoot,
graffiti = graffiti, fork = fork, slot = slot, graffiti = graffiti, fork = fork, slot = slot,
wall_slot = currentSlot wall_slot = currentSlot
try:
let randaoReveal = let randaoReveal =
block: try:
let res = await validator.getEpochSignature( let res = await validator.getEpochSignature(fork, genesisRoot, slot.epoch)
fork, genesisRoot, slot.epoch)
if res.isErr(): if res.isErr():
error "Unable to generate randao reveal usint remote signer", error "Unable to generate randao reveal usint remote signer",
validator = shortLog(validator), error_msg = res.error() validator = shortLog(validator), error_msg = res.error()
return return
res.get() res.get()
except CancelledError as exc:
error "Randao reveal processing was interrupted"
raise exc
except CatchableError as exc:
error "An unexpected error occurred while receiving randao data",
err_name = exc.name, err_msg = exc.msg
return
let beaconBlock = let beaconBlock =
try: try:
@ -39,6 +44,9 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
error "Unable to retrieve block data", slot = slot, error "Unable to retrieve block data", slot = slot,
wall_slot = currentSlot, validator = shortLog(validator) wall_slot = currentSlot, validator = shortLog(validator)
return return
except CancelledError as exc:
error "Producing block processing was interrupted"
raise exc
except CatchableError as exc: except CatchableError as exc:
error "An unexpected error occurred while getting block data", error "An unexpected error occurred while getting block data",
err_name = exc.name, err_msg = exc.msg err_name = exc.name, err_msg = exc.msg
@ -55,7 +63,7 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
if notSlashable.isOk(): if notSlashable.isOk():
let signature = let signature =
block: try:
let res = await validator.getBlockSignature(fork, genesisRoot, let res = await validator.getBlockSignature(fork, genesisRoot,
slot, blockRoot, slot, blockRoot,
beaconBlock) beaconBlock)
@ -64,6 +72,13 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
validator = shortLog(validator), error_msg = res.error() validator = shortLog(validator), error_msg = res.error()
return return
res.get() res.get()
except CancelledError as exc:
debug "Block signature processing was interrupted"
raise exc
except CatchableError as exc:
error "An unexpected error occurred while signing block",
err_name = exc.name, err_msg = exc.msg
return
debug "Sending block", debug "Sending block",
blockRoot = shortLog(blockRoot), blck = shortLog(beaconBlock), blockRoot = shortLog(blockRoot), blck = shortLog(beaconBlock),
@ -83,6 +98,9 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
validator_index = validator.index.get(), validator_index = validator.index.get(),
wall_slot = currentSlot wall_slot = currentSlot
return return
except CancelledError as exc:
debug "Publishing block processing was interrupted"
raise exc
except CatchableError as exc: except CatchableError as exc:
error "An unexpected error occurred while publishing block", error "An unexpected error occurred while publishing block",
err_name = exc.name, err_msg = exc.msg err_name = exc.name, err_msg = exc.msg
@ -104,9 +122,6 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
validator = shortLog(validator), validator = shortLog(validator),
wall_slot = currentSlot, wall_slot = currentSlot,
existingProposal = notSlashable.error existingProposal = notSlashable.error
except CatchableError as exc:
error "Unexpected error happens while proposing block",
error_name = exc.name, error_msg = exc.msg
proc proposeBlock(vc: ValidatorClientRef, slot: Slot, proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
proposerKey: ValidatorPubKey) {.async.} = proposerKey: ValidatorPubKey) {.async.} =
@ -130,10 +145,10 @@ proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
return return
res.get() res.get()
await vc.publishBlock(currentSlot, slot, validator) await vc.publishBlock(currentSlot, slot, validator)
except CancelledError as exc:
except CancelledError: debug "Block proposing was interrupted", slot = slot,
debug "Proposing task was cancelled", slot = slot,
validator = shortLog(proposerKey) validator = shortLog(proposerKey)
raise exc
proc spawnProposalTask(vc: ValidatorClientRef, proc spawnProposalTask(vc: ValidatorClientRef,
duty: RestProposerDuty): ProposerTask = duty: RestProposerDuty): ProposerTask =
@ -251,4 +266,13 @@ proc waitForBlockPublished*(vc: ValidatorClientRef, slot: Slot) {.async.} =
res.add(task.future) res.add(task.future)
res res
if len(pendingTasks) > 0: if len(pendingTasks) > 0:
try:
await allFutures(pendingTasks) await allFutures(pendingTasks)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingTasks:
if not(future.finished()):
pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc

View File

@ -120,7 +120,6 @@ type
config*: ValidatorClientConf config*: ValidatorClientConf
graffitiBytes*: GraffitiBytes graffitiBytes*: GraffitiBytes
beaconNodes*: seq[BeaconNodeServerRef] beaconNodes*: seq[BeaconNodeServerRef]
nodesAvailable*: AsyncEvent
fallbackService*: FallbackServiceRef fallbackService*: FallbackServiceRef
forkService*: ForkServiceRef forkService*: ForkServiceRef
dutiesService*: DutiesServiceRef dutiesService*: DutiesServiceRef
@ -134,6 +133,8 @@ type
attachedValidators*: ValidatorPool attachedValidators*: ValidatorPool
forks*: seq[Fork] forks*: seq[Fork]
forksAvailable*: AsyncEvent forksAvailable*: AsyncEvent
nodesAvailable*: AsyncEvent
gracefulExit*: AsyncEvent
attesters*: AttesterMap attesters*: AttesterMap
proposers*: ProposerMap proposers*: ProposerMap
syncCommitteeDuties*: SyncCommitteeDutiesMap syncCommitteeDuties*: SyncCommitteeDutiesMap
@ -173,13 +174,13 @@ chronicles.expandIt(RestAttesterDuty):
validator_committee_index = it.validator_committee_index validator_committee_index = it.validator_committee_index
proc stop*(csr: ClientServiceRef) {.async.} = proc stop*(csr: ClientServiceRef) {.async.} =
debug "Stopping service", service_name = csr.name debug "Stopping service", service = csr.name
if csr.state == ServiceState.Running: if csr.state == ServiceState.Running:
csr.state = ServiceState.Closing csr.state = ServiceState.Closing
if not(csr.lifeFut.finished()): if not(csr.lifeFut.finished()):
await csr.lifeFut.cancelAndWait() await csr.lifeFut.cancelAndWait()
csr.state = ServiceState.Closed csr.state = ServiceState.Closed
debug "Service stopped", service_name = csr.name debug "Service stopped", service = csr.name
proc isDefault*(dap: DutyAndProof): bool = proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64) dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)

View File

@ -2,7 +2,10 @@ import std/[sets, sequtils]
import chronicles import chronicles
import common, api, block_service import common, api, block_service
logScope: service = "duties_service" const
ServiceName = "duties_service"
logScope: service = ServiceName
type type
DutiesServiceLoop* = enum DutiesServiceLoop* = enum
@ -54,9 +57,9 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
except ValidatorApiError: except ValidatorApiError:
error "Unable to get head state's validator information" error "Unable to get head state's validator information"
return return
except CancelledError: except CancelledError as exc:
debug "Validator's indices request was interrupted" debug "Validator's indices processing was interrupted"
return raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occurred while getting validator information", error "Unexpected error occurred while getting validator information",
err_name = exc.name, err_msg = exc.msg err_name = exc.name, err_msg = exc.msg
@ -110,9 +113,9 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
except ValidatorApiError: except ValidatorApiError:
error "Unable to get attester duties", epoch = epoch error "Unable to get attester duties", epoch = epoch
return 0 return 0
except CancelledError: except CancelledError as exc:
debug "Attester duties request was interrupted" debug "Attester duties processing was interrupted"
return 0 raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occured while getting attester duties", error "Unexpected error occured while getting attester duties",
epoch = epoch, err_name = exc.name, err_msg = exc.msg epoch = epoch, err_name = exc.name, err_msg = exc.msg
@ -164,19 +167,27 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
res res
if len(addOrReplaceItems) > 0: if len(addOrReplaceItems) > 0:
var pending: seq[Future[SignatureResult]] var pendingRequests: seq[Future[SignatureResult]]
var validators: seq[AttachedValidator] var validators: seq[AttachedValidator]
for item in addOrReplaceItems: for item in addOrReplaceItems:
let validator = vc.attachedValidators.getValidator(item.duty.pubkey) let validator = vc.attachedValidators.getValidator(item.duty.pubkey)
let fork = vc.forkAtEpoch(item.duty.slot.epoch) let fork = vc.forkAtEpoch(item.duty.slot.epoch)
let future = validator.getSlotSignature( let future = validator.getSlotSignature(
fork, genesisRoot, item.duty.slot) fork, genesisRoot, item.duty.slot)
pending.add(future) pendingRequests.add(future)
validators.add(validator) validators.add(validator)
await allFutures(pending) try:
await allFutures(pendingRequests)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
raise exc
for index, fut in pending: for index, fut in pendingRequests:
let item = addOrReplaceItems[index] let item = addOrReplaceItems[index]
let dap = let dap =
if fut.done(): if fut.done():
@ -219,9 +230,9 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
except ValidatorApiError: except ValidatorApiError:
error "Unable to get sync committee duties", epoch = epoch error "Unable to get sync committee duties", epoch = epoch
return 0 return 0
except CancelledError: except CancelledError as exc:
debug "Request for sync committee duties was interrupted" debug "Sync committee duties processing was interrupted"
return 0 raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occurred while getting sync committee duties", error "Unexpected error occurred while getting sync committee duties",
epoch = epoch, err_name = exc.name, err_msg = exc.msg epoch = epoch, err_name = exc.name, err_msg = exc.msg
@ -261,7 +272,7 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
res res
if len(addOrReplaceItems) > 0: if len(addOrReplaceItems) > 0:
var pending: seq[Future[SignatureResult]] var pendingRequests: seq[Future[SignatureResult]]
var validators: seq[AttachedValidator] var validators: seq[AttachedValidator]
let sres = vc.getCurrentSlot() let sres = vc.getCurrentSlot()
if sres.isSome(): if sres.isSome():
@ -272,12 +283,20 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
genesisRoot, genesisRoot,
sres.get(), sres.get(),
getSubcommitteeIndex(item.duty.validator_sync_committee_index)) getSubcommitteeIndex(item.duty.validator_sync_committee_index))
pending.add(future) pendingRequests.add(future)
validators.add(validator) validators.add(validator)
await allFutures(pending) try:
await allFutures(pendingRequests)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
for future in pendingRequests:
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
raise exc
for index, fut in pending: for index, fut in pendingRequests:
let item = addOrReplaceItems[index] let item = addOrReplaceItems[index]
let dap = let dap =
if fut.done(): if fut.done():
@ -442,8 +461,9 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
except ValidatorApiError: except ValidatorApiError:
debug "Unable to get proposer duties", slot = currentSlot, debug "Unable to get proposer duties", slot = currentSlot,
epoch = currentEpoch epoch = currentEpoch
except CancelledError: except CancelledError as exc:
debug "Proposer duties request was interrupted" debug "Proposer duties processing was interrupted"
raise exc
except CatchableError as exc: except CatchableError as exc:
debug "Unexpected error occured while getting proposer duties", debug "Unexpected error occured while getting proposer duties",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name, slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
@ -512,10 +532,10 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
debug "Service started" debug "Service started"
var var
fut1 = service.attesterDutiesLoop() attestFut = service.attesterDutiesLoop()
fut2 = service.proposerDutiesLoop() proposeFut = service.proposerDutiesLoop()
fut3 = service.validatorIndexLoop() indicesFut = service.validatorIndexLoop()
fut4 = service.syncCommitteeeDutiesLoop() syncFut = service.syncCommitteeeDutiesLoop()
while true: while true:
# This loop could look much more nicer/better, when # This loop could look much more nicer/better, when
@ -523,20 +543,25 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
# become safe to combine loops, breaks and exception handlers. # become safe to combine loops, breaks and exception handlers.
let breakLoop = let breakLoop =
try: try:
discard await race(fut1, fut2, fut3, fut4) discard await race(attestFut, proposeFut, indicesFut, syncFut)
checkAndRestart(AttesterLoop, fut1, service.attesterDutiesLoop()) checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop())
checkAndRestart(ProposerLoop, fut2, service.proposerDutiesLoop()) checkAndRestart(ProposerLoop, proposeFut, service.proposerDutiesLoop())
checkAndRestart(IndicesLoop, fut3, service.validatorIndexLoop()) checkAndRestart(IndicesLoop, indicesFut, service.validatorIndexLoop())
checkAndRestart(SyncCommitteeLoop, checkAndRestart(SyncCommitteeLoop,
fut4, service.syncCommitteeeDutiesLoop()) syncFut, service.syncCommitteeeDutiesLoop())
false false
except CancelledError: except CancelledError:
if not(fut1.finished()): fut1.cancel()
if not(fut2.finished()): fut2.cancel()
if not(fut3.finished()): fut3.cancel()
if not(fut4.finished()): fut4.cancel()
await allFutures(fut1, fut2, fut3, fut4)
debug "Service interrupted" debug "Service interrupted"
var pending: seq[Future[void]]
if not(attestFut.finished()):
pending.add(attestFut.cancelAndWait())
if not(proposeFut.finished()):
pending.add(proposeFut.cancelAndWait())
if not(indicesFut.finished()):
pending.add(indicesFut.cancelAndWait())
if not(syncFut.finished()):
pending.add(syncFut.cancelAndWait())
await allFutures(pending)
true true
except CatchableError as exc: except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name, warn "Service crashed with unexpected error", err_name = exc.name,
@ -548,7 +573,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
proc init*(t: typedesc[DutiesServiceRef], proc init*(t: typedesc[DutiesServiceRef],
vc: ValidatorClientRef): Future[DutiesServiceRef] {.async.} = vc: ValidatorClientRef): Future[DutiesServiceRef] {.async.} =
let res = DutiesServiceRef(name: "duties_service", logScope: service = ServiceName
let res = DutiesServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized) client: vc, state: ServiceState.Initialized)
debug "Initializing service" debug "Initializing service"
# We query for indices first, to avoid empty queries for duties. # We query for indices first, to avoid empty queries for duties.

View File

@ -1,6 +1,9 @@
import common import common
logScope: service = "fallback_service" const
ServiceName = "fallback_service"
logScope: service = ServiceName
type type
BeaconNodesCounters* = object BeaconNodesCounters* = object
@ -38,7 +41,8 @@ proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters =
inc(res.online) inc(res.online)
res res
proc waitOnlineNodes*(vc: ValidatorClientRef) {.async.} = proc waitOnlineNodes*(vc: ValidatorClientRef,
timeoutFut: Future[void] = nil) {.async.} =
doAssert(not(isNil(vc.fallbackService))) doAssert(not(isNil(vc.fallbackService)))
while true: while true:
if vc.onlineNodesCount() != 0: if vc.onlineNodesCount() != 0:
@ -50,7 +54,26 @@ proc waitOnlineNodes*(vc: ValidatorClientRef) {.async.} =
online_nodes = vc.onlineNodesCount(), online_nodes = vc.onlineNodesCount(),
unusable_nodes = vc.unusableNodesCount(), unusable_nodes = vc.unusableNodesCount(),
total_nodes = len(vc.beaconNodes) total_nodes = len(vc.beaconNodes)
if isNil(timeoutFut):
await vc.fallbackService.onlineEvent.wait() await vc.fallbackService.onlineEvent.wait()
else:
let breakLoop =
block:
let waitFut = vc.fallbackService.onlineEvent.wait()
try:
discard await race(waitFut, timeoutFut)
except CancelledError as exc:
if not(waitFut.finished()):
await waitFut.cancelAndWait()
raise exc
if not(waitFut.finished()):
await waitFut.cancelAndWait()
true
else:
false
if breakLoop:
break
proc checkCompatible(vc: ValidatorClientRef, proc checkCompatible(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} = node: BeaconNodeServerRef) {.async.} =
@ -207,13 +230,10 @@ proc checkNodes*(service: FallbackServiceRef) {.async.} =
try: try:
await allFutures(pendingChecks) await allFutures(pendingChecks)
except CancelledError as exc: except CancelledError as exc:
let pending = var pending: seq[Future[void]]
block: for future in pendingChecks:
var res: seq[Future[void]] if not(future.finished()):
for fut in pendingChecks: pending.add(future.cancelAndWait())
if not(fut.finished()):
res.add(fut.cancelAndWait())
res
await allFutures(pending) await allFutures(pending)
raise exc raise exc
@ -255,10 +275,11 @@ proc mainLoop(service: FallbackServiceRef) {.async.} =
proc init*(t: typedesc[FallbackServiceRef], proc init*(t: typedesc[FallbackServiceRef],
vc: ValidatorClientRef): Future[FallbackServiceRef] {.async.} = vc: ValidatorClientRef): Future[FallbackServiceRef] {.async.} =
debug "Initializing service" logScope: service = ServiceName
var res = FallbackServiceRef(name: "fallback_service", client: vc, var res = FallbackServiceRef(name: ServiceName, client: vc,
state: ServiceState.Initialized, state: ServiceState.Initialized,
onlineEvent: newAsyncEvent()) onlineEvent: newAsyncEvent())
debug "Initializing service"
# Perform initial nodes check. # Perform initial nodes check.
await res.checkNodes() await res.checkNodes()
return res return res

View File

@ -2,7 +2,10 @@ import std/algorithm
import chronicles import chronicles
import common, api import common, api
logScope: service = "fork_service" const
ServiceName = "fork_service"
logScope: service = ServiceName
proc validateForkSchedule(forks: openArray[Fork]): bool {.raises: [Defect].} = proc validateForkSchedule(forks: openArray[Fork]): bool {.raises: [Defect].} =
# Check if `forks` list is linked list. # Check if `forks` list is linked list.
@ -45,6 +48,9 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} =
except ValidatorApiError as exc: except ValidatorApiError as exc:
error "Unable to retrieve fork schedule", reason = exc.msg error "Unable to retrieve fork schedule", reason = exc.msg
return return
except CancelledError as exc:
debug "Fork retrieval process was interrupted"
raise exc
except CatchableError as exc: except CatchableError as exc:
error "Unexpected error occured while getting fork information", error "Unexpected error occured while getting fork information",
err_name = exc.name, err_msg = exc.msg err_name = exc.name, err_msg = exc.msg
@ -96,9 +102,10 @@ proc mainLoop(service: ForkServiceRef) {.async.} =
proc init*(t: typedesc[ForkServiceRef], proc init*(t: typedesc[ForkServiceRef],
vc: ValidatorClientRef): Future[ForkServiceRef] {.async.} = vc: ValidatorClientRef): Future[ForkServiceRef] {.async.} =
debug "Initializing service" logScope: service = ServiceName
let res = ForkServiceRef(name: "fork_service", let res = ForkServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized) client: vc, state: ServiceState.Initialized)
debug "Initializing service"
await vc.pollForFork() await vc.pollForFork()
return res return res

View File

@ -12,7 +12,10 @@ import
../spec/datatypes/[phase0, altair, bellatrix], ../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types ../spec/eth2_apis/rest_types
logScope: service = "sync_committee_service" const
ServiceName = "sync_committee_service"
logScope: service = ServiceName
type type
ContributionItem* = object ContributionItem* = object
@ -399,9 +402,10 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
proc init*(t: typedesc[SyncCommitteeServiceRef], proc init*(t: typedesc[SyncCommitteeServiceRef],
vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} = vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} =
debug "Initializing service" logScope: service = ServiceName
let res = SyncCommitteeServiceRef(name: "sync_committee_service", let res = SyncCommitteeServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized) client: vc, state: ServiceState.Initialized)
debug "Initializing service"
return res return res
proc start*(service: SyncCommitteeServiceRef) = proc start*(service: SyncCommitteeServiceRef) =