Validator client various fixes. (#3840)

* Improve fallback_service.
* Fix nextAction negative time issue.
* Improve logging in fallback_service.
* Improve logging in sync_committee_service.
* Prepare all services for cancellation.
* Signals handlers for validator client
* Address #3800

Co-authored-by: Zahary Karadjov <zahary@gmail.com>
This commit is contained in:
Eugene Kabanov 2022-07-13 17:43:57 +03:00 committed by GitHub
parent 06c8e10ae2
commit 263a2ffa14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 702 additions and 549 deletions

View File

@ -1330,8 +1330,8 @@ func syncStatus(node: BeaconNode): string =
else:
"synced"
proc onSlotStart(
node: BeaconNode, wallTime: BeaconTime, lastSlot: Slot) {.async.} =
proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
lastSlot: Slot): Future[bool] {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
@ -1357,7 +1357,8 @@ proc onSlotStart(
delay = shortLog(delay)
# Check before any re-scheduling of onSlotStart()
checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch)
if checkIfShouldStopAtEpoch(wallSlot, node.config.stopAtEpoch):
quit(0)
when defined(windows):
if node.config.runAsService:
@ -1379,6 +1380,8 @@ proc onSlotStart(
await onSlotEnd(node, wallSlot)
return false
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine[].checkMissing()
if missingBlocks.len > 0:
@ -1605,7 +1608,7 @@ proc run(node: BeaconNode) {.raises: [Defect, CatchableError].} =
proc SIGTERMHandler(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM"
bnStatus = BeaconNodeStatus.Stopping
c_signal(SIGTERM, SIGTERMHandler)
c_signal(ansi_c.SIGTERM, SIGTERMHandler)
# main event loop
while bnStatus == BeaconNodeStatus.Running:
@ -2153,7 +2156,7 @@ programMain:
proc exitImmediatelyOnSIGTERM(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM"
quit 0
c_signal(SIGTERM, exitImmediatelyOnSIGTERM)
c_signal(ansi_c.SIGTERM, exitImmediatelyOnSIGTERM)
when defined(windows):
if config.runAsService:

View File

@ -31,7 +31,7 @@ export
type
SlotStartProc*[T] = proc(node: T, wallTime: BeaconTime,
lastSlot: Slot): Future[void] {.gcsafe,
lastSlot: Slot): Future[bool] {.gcsafe,
raises: [Defect].}
# silly chronicles, colors is a compile-time property
@ -216,7 +216,8 @@ template makeBannerAndConfig*(clientId: string, ConfType: type): untyped =
{.pop.}
config
proc checkIfShouldStopAtEpoch*(scheduledSlot: Slot, stopAtEpoch: uint64) =
proc checkIfShouldStopAtEpoch*(scheduledSlot: Slot,
stopAtEpoch: uint64): bool =
# Offset backwards slightly to allow this epoch's finalization check to occur
if scheduledSlot > 3 and stopAtEpoch > 0'u64 and
(scheduledSlot - 3).epoch() >= stopAtEpoch:
@ -224,9 +225,9 @@ proc checkIfShouldStopAtEpoch*(scheduledSlot: Slot, stopAtEpoch: uint64) =
chosenEpoch = stopAtEpoch,
epoch = scheduledSlot.epoch(),
slot = scheduledSlot
# Brute-force, but ensure it's reliable enough to run in CI.
quit(0)
true
else:
false
proc resetStdin*() =
when defined(posix):
@ -302,7 +303,9 @@ proc runSlotLoop*[T](node: T, startTime: BeaconTime,
curSlot = shortLog(curSlot),
nextSlot = shortLog(curSlot)
await slotProc(node, wallTime, curSlot)
let breakLoop = await slotProc(node, wallTime, curSlot)
if breakLoop:
break
curSlot = wallSlot
nextSlot = wallSlot + 1

View File

@ -29,7 +29,8 @@ proc onSecond(
getBeaconTime: GetBeaconTimeFn) =
## This procedure will be called once per second.
let wallSlot = getBeaconTime().slotOrZero()
checkIfShouldStopAtEpoch(wallSlot, config.stopAtEpoch)
if checkIfShouldStopAtEpoch(wallSlot, config.stopAtEpoch):
quit(0)
lightClient.updateGossipStatus(wallSlot + 1)

View File

@ -19,7 +19,7 @@ proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
try:
await allFutures(pending)
except CancelledError as exc:
warn "Unexpected cancellation interrupt"
debug "Genesis information request was interrupted"
raise exc
let (errorNodes, genesisList) =
@ -104,6 +104,36 @@ proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
await sleepAsync(genesisTime.offset)
return res
proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
lastSlot: Slot): Future[bool] {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
## lastSlot: the last slot that we successfully processed, so we know where to
## start work from - there might be jumps if processing is delayed
let
# The slot we should be at, according to the clock
beaconTime = wallTime
wallSlot = wallTime.toSlot()
let
# If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
delay = wallTime - expectedSlot.start_beacon_time()
if checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch):
return true
info "Slot start",
slot = shortLog(wallSlot.slot),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
delay = shortLog(delay)
return false
proc asyncInit(vc: ValidatorClientRef) {.async.} =
vc.beaconGenesis = await vc.initGenesis()
info "Genesis information", genesis_time = vc.beaconGenesis.genesis_time,
@ -122,38 +152,16 @@ proc asyncInit(vc: ValidatorClientRef) {.async.} =
vc.config.validatorsDir(), "slashing_protection"
)
try:
vc.fallbackService = await FallbackServiceRef.init(vc)
vc.forkService = await ForkServiceRef.init(vc)
vc.dutiesService = await DutiesServiceRef.init(vc)
vc.attestationService = await AttestationServiceRef.init(vc)
vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc)
proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
## lastSlot: the last slot that we successfully processed, so we know where to
## start work from - there might be jumps if processing is delayed
let
# The slot we should be at, according to the clock
beaconTime = wallTime
wallSlot = wallTime.toSlot()
let
# If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
delay = wallTime - expectedSlot.start_beacon_time()
checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch)
info "Slot start",
slot = shortLog(wallSlot.slot),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
delay = shortLog(delay)
except CancelledError:
debug "Initialization process interrupted"
info "Closing slashing protection", path = vc.config.validatorsDir()
vc.attachedValidators.slashingProtection.close()
proc asyncRun(vc: ValidatorClientRef) {.async.} =
vc.fallbackService.start()
@ -162,7 +170,66 @@ proc asyncRun(vc: ValidatorClientRef) {.async.} =
vc.attestationService.start()
vc.syncCommitteeService.start()
await runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)
try:
vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)
await vc.runSlotLoopFut
except CancelledError:
debug "Main loop interrupted"
except CatchableError as exc:
debug "Main loop failed with an error", err_name = $exc.name,
err_msg = $exc.msg
info "Closing slashing protection", path = vc.config.validatorsDir()
vc.attachedValidators.slashingProtection.close()
debug "Stopping main processing loop"
var pending: seq[Future[void]]
if not(vc.runSlotLoopFut.finished()):
pending.add(vc.runSlotLoopFut.cancelAndWait())
if not(vc.sigintHandleFut.finished()):
pending.add(vc.sigintHandleFut.cancelAndWait())
if not(vc.sigtermHandleFut.finished()):
pending.add(vc.sigtermHandleFut.cancelAndWait())
debug "Stopping running services"
pending.add(vc.fallbackService.stop())
pending.add(vc.forkService.stop())
pending.add(vc.dutiesService.stop())
pending.add(vc.attestationService.stop())
pending.add(vc.syncCommitteeService.stop())
await allFutures(pending)
template runWithSignals(vc: ValidatorClientRef, body: untyped): bool =
let future = body
discard await race(future, vc.sigintHandleFut, vc.sigtermHandleFut)
if future.finished():
if future.failed() or future.cancelled():
let exc = future.readError()
debug "Validator client initialization failed", err_name = $exc.name,
err_msg = $exc.msg
var pending: seq[Future[void]]
if not(vc.sigintHandleFut.finished()):
pending.add(cancelAndWait(vc.sigintHandleFut))
if not(vc.sigtermHandleFut.finished()):
pending.add(cancelAndWait(vc.sigtermHandleFut))
await allFutures(pending)
false
else:
true
else:
let signal = if vc.sigintHandleFut.finished(): "SIGINT" else: "SIGTERM"
info "Got interrupt, trying to shutdown gracefully", signal = signal
var pending = @[cancelAndWait(future)]
if not(vc.sigintHandleFut.finished()):
pending.add(cancelAndWait(vc.sigintHandleFut))
if not(vc.sigtermHandleFut.finished()):
pending.add(cancelAndWait(vc.sigtermHandleFut))
await allFutures(pending)
false
proc asyncLoop*(vc: ValidatorClientRef) {.async.} =
if not(vc.runWithSignals(asyncInit(vc))):
return
if not(vc.runWithSignals(asyncRun(vc))):
return
programMain:
let config = makeBannerAndConfig("Nimbus validator client " & fullVersionStr,
@ -193,13 +260,27 @@ programMain:
config,
beacon_nodes_count = len(beaconNodes)
var vc = ValidatorClientRef(
var vc =
when declared(waitSignal):
ValidatorClientRef(
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent()
forksAvailable: newAsyncEvent(),
sigintHandleFut: waitSignal(SIGINT),
sigtermHandleFut: waitSignal(SIGTERM)
)
else:
ValidatorClientRef(
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
sigintHandleFut: newFuture[void]("sigint_placeholder"),
sigtermHandleFut: newFuture[void]("sigterm_placeholder")
)
waitFor asyncInit(vc)
waitFor asyncRun(vc)
waitFor asyncLoop(vc)
info "Validator client stopped"

View File

@ -1,6 +1,7 @@
import chronicles,
../spec/eth2_apis/eth2_rest_serialization,
../spec/datatypes/[phase0, altair], common
import chronicles
import ../spec/eth2_apis/eth2_rest_serialization,
../spec/datatypes/[phase0, altair]
import common, fallback_service
export eth2_rest_serialization, common
@ -9,236 +10,6 @@ type
ApiOperation = enum
Success, Timeout, Failure, Interrupt
proc checkCompatible*(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
let info =
try:
debug "Requesting beacon node network configuration"
let res = await node.client.getSpecVC()
res.data.data
except CancelledError as exc:
error "Configuration request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to obtain beacon node's configuration",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
let genesis =
try:
debug "Requesting beacon node genesis information"
let res = await node.client.getGenesis()
res.data.data
except CancelledError as exc:
error "Genesis request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to obtain beacon node's genesis",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
let genesisFlag = (genesis != vc.beaconGenesis)
let configFlag =
# /!\ Keep in sync with `spec/eth2_apis/rest_types.nim` > `RestSpecVC`.
info.MAX_VALIDATORS_PER_COMMITTEE != MAX_VALIDATORS_PER_COMMITTEE or
info.SLOTS_PER_EPOCH != SLOTS_PER_EPOCH or
info.SECONDS_PER_SLOT != SECONDS_PER_SLOT or
info.EPOCHS_PER_ETH1_VOTING_PERIOD != EPOCHS_PER_ETH1_VOTING_PERIOD or
info.SLOTS_PER_HISTORICAL_ROOT != SLOTS_PER_HISTORICAL_ROOT or
info.EPOCHS_PER_HISTORICAL_VECTOR != EPOCHS_PER_HISTORICAL_VECTOR or
info.EPOCHS_PER_SLASHINGS_VECTOR != EPOCHS_PER_SLASHINGS_VECTOR or
info.HISTORICAL_ROOTS_LIMIT != HISTORICAL_ROOTS_LIMIT or
info.VALIDATOR_REGISTRY_LIMIT != VALIDATOR_REGISTRY_LIMIT or
info.MAX_PROPOSER_SLASHINGS != MAX_PROPOSER_SLASHINGS or
info.MAX_ATTESTER_SLASHINGS != MAX_ATTESTER_SLASHINGS or
info.MAX_ATTESTATIONS != MAX_ATTESTATIONS or
info.MAX_DEPOSITS != MAX_DEPOSITS or
info.MAX_VOLUNTARY_EXITS != MAX_VOLUNTARY_EXITS or
info.DOMAIN_BEACON_PROPOSER != DOMAIN_BEACON_PROPOSER or
info.DOMAIN_BEACON_ATTESTER != DOMAIN_BEACON_ATTESTER or
info.DOMAIN_RANDAO != DOMAIN_RANDAO or
info.DOMAIN_DEPOSIT != DOMAIN_DEPOSIT or
info.DOMAIN_VOLUNTARY_EXIT != DOMAIN_VOLUNTARY_EXIT or
info.DOMAIN_SELECTION_PROOF != DOMAIN_SELECTION_PROOF or
info.DOMAIN_AGGREGATE_AND_PROOF != DOMAIN_AGGREGATE_AND_PROOF
if configFlag or genesisFlag:
node.status = RestBeaconNodeStatus.Incompatible
warn "Beacon node has incompatible configuration",
genesis_flag = genesisFlag, config_flag = configFlag
else:
info "Beacon node has compatible configuration"
node.config = some(info)
node.genesis = some(genesis)
node.status = RestBeaconNodeStatus.Online
proc checkSync*(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
let syncInfo =
try:
debug "Requesting beacon node sync status"
let res = await node.client.getSyncingStatus()
res.data.data
except CancelledError as exc:
error "Sync status request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to obtain beacon node's sync status",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
node.syncInfo = some(syncInfo)
node.status =
if not(syncInfo.is_syncing) or (syncInfo.sync_distance < SYNC_TOLERANCE):
info "Beacon node is in sync", sync_distance = syncInfo.sync_distance,
head_slot = syncInfo.head_slot
RestBeaconNodeStatus.Online
else:
warn "Beacon node not in sync", sync_distance = syncInfo.sync_distance,
head_slot = syncInfo.head_slot
RestBeaconNodeStatus.NotSynced
proc checkOnline*(node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
debug "Checking beacon node status"
let agent =
try:
let res = await node.client.getNodeVersion()
res.data.data
except CancelledError as exc:
error "Status request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to check beacon node's status",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
debug "Beacon node has been identified", agent = agent.version
node.ident = some(agent.version)
node.status = RestBeaconNodeStatus.Online
proc checkNode*(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
debug "Checking beacon node", endpoint = node
await node.checkOnline()
if node.status != RestBeaconNodeStatus.Online:
return
await vc.checkCompatible(node)
if node.status != RestBeaconNodeStatus.Online:
return
await vc.checkSync(node)
proc checkNodes*(vc: ValidatorClientRef,
nodeStatuses: set[RestBeaconNodeStatus]) {.async.} =
doAssert(RestBeaconNodeStatus.Online notin nodeStatuses)
let nodesToCheck =
vc.beaconNodes.filterIt(it.status in nodeStatuses)
let pending =
block:
var res: seq[Future[void]]
for node in nodesToCheck:
res.add(vc.checkNode(node))
res
if len(pending) > 0:
try:
await allFutures(pending)
except CancelledError as exc:
# allFutures() did not cancel passed Futures, so we need to send
# cancellation to all the children.
for fut in pending:
if not(fut.finished()):
fut.cancel()
await allFutures(pending)
raise exc
template onceToAll*(vc: ValidatorClientRef, responseType: typedesc,
timeout: Duration, body: untyped,
handlers: untyped): untyped =
var it {.inject.}: RestClientRef
var operationResult {.inject.}: bool = false
type BodyType = typeof(body)
let onlineNodes =
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
if len(onlineNodes) > 0:
var pending =
block:
var res: seq[BodyType]
for node {.inject.} in onlineNodes:
it = node.client
it = node.client
let fut = body
res.add(fut)
res
let opres =
try:
await allFutures(pending).wait(timeout)
ApiOperation.Success
except AsyncTimeoutError:
ApiOperation.Timeout
except CancelledError:
ApiOperation.Interrupt
for idx, node {.inject.} in onlineNodes:
it = node.client
let apiResponse {.inject.} =
block:
let fut = pending[idx]
if fut.finished():
if fut.failed() or fut.cancelled():
let exc = fut.readError()
ApiResponse[responseType].err("[" & $exc.name & "] " & $exc.msg)
else:
ApiResponse[responseType].ok(fut.read())
else:
case opres
of ApiOperation.Interrupt:
fut.cancel()
onlineNodes[idx].status = RestBeaconNodeStatus.Offline
ApiResponse[responseType].err("Operation interrupted")
of ApiOperation.Timeout:
fut.cancel()
onlineNodes[idx].status = RestBeaconNodeStatus.Offline
ApiResponse[responseType].err("Operation timeout exceeded")
of ApiOperation.Success, ApiOperation.Failure:
# This should not be happened, because all Futures should be
# finished, and `Failure` processed when Future is finished.
ApiResponse[responseType].err("Unexpected error")
node.status = handlers
if node.status == RestBeaconNodeStatus.Online:
operationResult = true
template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
timeout: Duration, body: untyped,
handlers: untyped): untyped =
@ -254,8 +25,7 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
var iterationsCount = 0
while true:
let onlineNodes =
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
let onlineNodes = vc.onlineNodes()
if iterationsCount != 0:
debug "Request got failed", iterations_count = iterationsCount
@ -342,76 +112,35 @@ template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
if exitNow:
break
let unusableModeMask = {RestBeaconNodeStatus.Offline,
RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.Uninitalized,
RestBeaconNodeStatus.Incompatible}
let unusableNodes = vc.beaconNodes.filterIt(it.status in unusableModeMask)
let onlineNodesCount = len(vc.beaconNodes) - len(unusableNodes)
await vc.waitOnlineNodes()
warn "No working beacon nodes available, refreshing nodes status",
online_nodes = onlineNodesCount, unusable_nodes = len(unusableNodes)
var checkFut = vc.checkNodes(unusableModeMask)
let checkOp =
block:
if isNil(timerFut):
try:
# We use `allFutures()` to keep result in `checkFut`, but still
# be able to check errors.
await allFutures(checkFut)
let onlineCount = vc.beaconNodes.countIt(
it.status == RestBeaconNodeStatus.Online)
if onlineCount == 0:
# Small pause here to avoid continous spam beacon nodes with
# checking requests.
await sleepAsync(500.milliseconds)
ApiOperation.Success
except CancelledError:
# `allFutures()` could not cancel Futures.
if not(checkFut.finished()):
checkFut.cancel()
await allFutures(checkFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race()` or `allFutures()` start raise
# exceptions.
ApiOperation.Failure
proc getDutyErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestDutyError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
let failures = errorObj.failures.mapIt(Base10.toString(it.index) & ": " &
it.message)
errorObj.message & ": [" & failures.join(", ") & "]"
else:
try:
discard await race(checkFut, timerFut)
if checkFut.finished():
let onlineCount = vc.beaconNodes.countIt(
it.status == RestBeaconNodeStatus.Online)
if onlineCount == 0:
# Small pause here to avoid continous spam beacon nodes with
# checking requests.
await sleepAsync(500.milliseconds)
ApiOperation.Success
"Unable to decode error response: [" & $res.error() & "]"
proc getGenericErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestGenericError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
if errorObj.stacktraces.isSome():
errorObj.message & ": [" & errorObj.stacktraces.get().join("; ") & "]"
else:
checkFut.cancel()
await allFutures(checkFut)
ApiOperation.Timeout
except CancelledError:
# `race()` and `allFutures()` could not cancel Futures.
if not(timerFut.finished()):
timerFut.cancel()
if not(checkFut.finished()):
checkFut.cancel()
await allFutures(checkFut, timerFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race` or `allFutures` start raise
# exceptions.
ApiOperation.Failure
errorObj.message
else:
"Unable to decode error response: [" & $res.error() & "]"
if checkOp != ApiOperation.Success:
exitNow = true
break
proc getProposerDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[GetProposerDutiesResponse] {.async.} =
proc getProposerDuties*(
vc: ValidatorClientRef,
epoch: Epoch
): Future[GetProposerDutiesResponse] {.async.} =
logScope: request = "getProposerDuties"
vc.firstSuccessTimeout(RestResponse[GetProposerDutiesResponse], SlotDuration,
getProposerDuties(it, epoch)):
@ -447,7 +176,8 @@ proc getProposerDuties*(vc: ValidatorClientRef,
proc getAttesterDuties*(
vc: ValidatorClientRef,
epoch: Epoch,
validators: seq[ValidatorIndex]): Future[GetAttesterDutiesResponse] {.async.} =
validators: seq[ValidatorIndex]
): Future[GetAttesterDutiesResponse] {.async.} =
logScope: request = "getAttesterDuties"
vc.firstSuccessTimeout(RestResponse[GetAttesterDutiesResponse], SlotDuration,
getAttesterDuties(it, epoch, validators)):
@ -483,7 +213,8 @@ proc getAttesterDuties*(
proc getSyncCommitteeDuties*(
vc: ValidatorClientRef,
epoch: Epoch,
validators: seq[ValidatorIndex]): Future[GetSyncCommitteeDutiesResponse] {.async.} =
validators: seq[ValidatorIndex]
): Future[GetSyncCommitteeDutiesResponse] {.async.} =
logScope: request = "getSyncCommitteeDuties"
vc.firstSuccessTimeout(RestResponse[GetSyncCommitteeDutiesResponse], SlotDuration,
getSyncCommitteeDuties(it, epoch, validators)):
@ -514,9 +245,12 @@ proc getSyncCommitteeDuties*(
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve sync committee duties")
raise newException(ValidatorApiError,
"Unable to retrieve sync committee duties")
proc getForkSchedule*(vc: ValidatorClientRef): Future[seq[Fork]] {.async.} =
proc getForkSchedule*(
vc: ValidatorClientRef
): Future[seq[Fork]] {.async.} =
logScope: request = "getForkSchedule"
vc.firstSuccessTimeout(RestResponse[GetForkScheduleResponse], SlotDuration,
getForkSchedule(it)):
@ -540,7 +274,9 @@ proc getForkSchedule*(vc: ValidatorClientRef): Future[seq[Fork]] {.async.} =
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve fork schedule")
proc getHeadStateFork*(vc: ValidatorClientRef): Future[Fork] {.async.} =
proc getHeadStateFork*(
vc: ValidatorClientRef
): Future[Fork] {.async.} =
logScope: request = "getHeadStateFork"
let stateIdent = StateIdent.init(StateIdentType.Head)
vc.firstSuccessTimeout(RestResponse[GetStateForkResponse], SlotDuration,
@ -570,7 +306,9 @@ proc getHeadStateFork*(vc: ValidatorClientRef): Future[Fork] {.async.} =
raise newException(ValidatorApiError, "Unable to retrieve head state's fork")
proc getHeadBlockRoot*(vc: ValidatorClientRef): Future[RestRoot] {.async.} =
proc getHeadBlockRoot*(
vc: ValidatorClientRef
): Future[RestRoot] {.async.} =
logScope: request = "getHeadBlockRoot"
let blockIdent = BlockIdent.init(BlockIdentType.Head)
vc.firstSuccessTimeout(RestResponse[GetBlockRootResponse], SlotDuration,
@ -602,7 +340,8 @@ proc getHeadBlockRoot*(vc: ValidatorClientRef): Future[RestRoot] {.async.} =
proc getValidators*(
vc: ValidatorClientRef,
id: seq[ValidatorIdent]): Future[seq[RestValidator]] {.async.} =
id: seq[ValidatorIdent]
): Future[seq[RestValidator]] {.async.} =
logScope: request = "getStateValidators"
let stateIdent = StateIdent.init(StateIdentType.Head)
vc.firstSuccessTimeout(RestResponse[GetStateValidatorsResponse], SlotDuration,
@ -636,7 +375,8 @@ proc getValidators*(
proc produceAttestationData*(
vc: ValidatorClientRef,
slot: Slot,
committee_index: CommitteeIndex): Future[AttestationData] {.async.} =
committee_index: CommitteeIndex
): Future[AttestationData] {.async.} =
logScope: request = "produceAttestationData"
vc.firstSuccessTimeout(RestResponse[ProduceAttestationDataResponse],
OneThirdDuration,
@ -670,31 +410,10 @@ proc produceAttestationData*(
raise newException(ValidatorApiError, "Unable to retrieve attestation data")
proc getDutyErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestDutyError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
let failures = errorObj.failures.mapIt(Base10.toString(it.index) & ": " &
it.message)
errorObj.message & ": [" & failures.join(", ") & "]"
else:
"Unable to decode error response: [" & $res.error() & "]"
proc getGenericErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestGenericError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
if errorObj.stacktraces.isSome():
errorObj.message & ": [" & errorObj.stacktraces.get().join("; ") & "]"
else:
errorObj.message
else:
"Unable to decode error response: [" & $res.error() & "]"
proc submitPoolAttestations*(vc: ValidatorClientRef,
data: seq[Attestation]): Future[bool] {.async.} =
proc submitPoolAttestations*(
vc: ValidatorClientRef,
data: seq[Attestation]
): Future[bool] {.async.} =
logScope: request = "submitPoolAttestations"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
submitPoolAttestations(it, data)):
@ -726,8 +445,10 @@ proc submitPoolAttestations*(vc: ValidatorClientRef,
raise newException(ValidatorApiError, "Unable to submit attestation")
proc submitPoolSyncCommitteeSignature*(vc: ValidatorClientRef,
data: SyncCommitteeMessage): Future[bool] {.async.} =
proc submitPoolSyncCommitteeSignature*(
vc: ValidatorClientRef,
data: SyncCommitteeMessage
): Future[bool] {.async.} =
logScope: request = "submitPoolSyncCommitteeSignatures"
let restData = RestSyncCommitteeMessage.init(
data.slot,
@ -765,8 +486,11 @@ proc submitPoolSyncCommitteeSignature*(vc: ValidatorClientRef,
raise newException(ValidatorApiError, "Unable to submit sync committee message")
proc getAggregatedAttestation*(vc: ValidatorClientRef, slot: Slot,
root: Eth2Digest): Future[Attestation] {.async.} =
proc getAggregatedAttestation*(
vc: ValidatorClientRef,
slot: Slot,
root: Eth2Digest
): Future[Attestation] {.async.} =
logScope: request = "getAggregatedAttestation"
vc.firstSuccessTimeout(RestResponse[GetAggregatedAttestationResponse],
OneThirdDuration,
@ -801,7 +525,8 @@ proc produceSyncCommitteeContribution*(
vc: ValidatorClientRef,
slot: Slot,
subcommitteeIndex: SyncSubcommitteeIndex,
root: Eth2Digest): Future[SyncCommitteeContribution] {.async.} =
root: Eth2Digest
): Future[SyncCommitteeContribution] {.async.} =
logScope: request = "produceSyncCommitteeContribution"
vc.firstSuccessTimeout(RestResponse[ProduceSyncCommitteeContributionResponse],
OneThirdDuration,
@ -838,7 +563,8 @@ proc produceSyncCommitteeContribution*(
proc publishAggregateAndProofs*(
vc: ValidatorClientRef,
data: seq[SignedAggregateAndProof]): Future[bool] {.async.} =
data: seq[SignedAggregateAndProof]
): Future[bool] {.async.} =
logScope: request = "publishAggregateAndProofs"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
publishAggregateAndProofs(it, data)):
@ -873,7 +599,8 @@ proc publishAggregateAndProofs*(
proc publishContributionAndProofs*(
vc: ValidatorClientRef,
data: seq[RestSignedContributionAndProof]): Future[bool] {.async.} =
data: seq[RestSignedContributionAndProof]
): Future[bool] {.async.} =
logScope: request = "publishContributionAndProofs"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
publishContributionAndProofs(it, data)):
@ -885,7 +612,8 @@ proc publishContributionAndProofs*(
let response = apiResponse.get()
case response.status:
of 200:
debug "Contribution and proofs were successfully published", endpoint = node
debug "Contribution and proofs were successfully published",
endpoint = node
return true
of 400:
debug "Received invalid request response",
@ -910,7 +638,8 @@ proc produceBlockV2*(
vc: ValidatorClientRef,
slot: Slot,
randao_reveal: ValidatorSig,
graffiti: GraffitiBytes): Future[ProduceBlockResponseV2] {.async.} =
graffiti: GraffitiBytes
): Future[ProduceBlockResponseV2] {.async.} =
logScope: request = "produceBlockV2"
vc.firstSuccessTimeout(RestResponse[ProduceBlockResponseV2],
SlotDuration,
@ -944,8 +673,10 @@ proc produceBlockV2*(
raise newException(ValidatorApiError, "Unable to retrieve block data")
proc publishBlock*(vc: ValidatorClientRef,
data: ForkedSignedBeaconBlock): Future[bool] {.async.} =
proc publishBlock*(
vc: ValidatorClientRef,
data: ForkedSignedBeaconBlock
): Future[bool] {.async.} =
logScope: request = "publishBlock"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration):
case data.kind
@ -995,7 +726,8 @@ proc publishBlock*(vc: ValidatorClientRef,
proc prepareBeaconCommitteeSubnet*(
vc: ValidatorClientRef,
data: seq[RestCommitteeSubscription]): Future[bool] {.async.} =
data: seq[RestCommitteeSubscription]
): Future[bool] {.async.} =
logScope: request = "prepareBeaconCommitteeSubnet"
vc.firstSuccessTimeout(RestPlainResponse, OneThirdDuration,
prepareBeaconCommitteeSubnet(it, data)):
@ -1034,7 +766,8 @@ proc prepareBeaconCommitteeSubnet*(
proc prepareSyncCommitteeSubnets*(
vc: ValidatorClientRef,
data: seq[RestSyncCommitteeSubscription]): Future[bool] {.async.} =
data: seq[RestSyncCommitteeSubscription]
): Future[bool] {.async.} =
logScope: request = "prepareSyncCommitteeSubnet"
vc.firstSuccessTimeout(RestPlainResponse, OneThirdDuration,
prepareSyncCommitteeSubnets(it, data)):

View File

@ -76,6 +76,9 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
validator = shortLog(validator),
validator_index = vindex
return false
except CancelledError:
debug "Publish attestation request was interrupted"
return false
except CatchableError as exc:
error "Unexpected error occured while publishing attestation",
attestation = shortLog(attestation),
@ -141,6 +144,9 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
validator = shortLog(validator),
validator_index = vindex
return false
except CancelledError:
debug "Publish aggregate and proofs request was interrupted"
return false
except CatchableError as exc:
error "Unexpected error occured while publishing aggregated attestation",
attestation = shortLog(signedProof.message.aggregate),
@ -257,6 +263,9 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
error "Unable to get aggregated attestation data", slot = slot,
attestation_root = shortLog(attestationRoot)
return
except CancelledError:
debug "Aggregated attestation request was interrupted"
return
except CatchableError as exc:
error "Unexpected error occured while getting aggregated attestation",
slot = slot, attestation_root = shortLog(attestationRoot),
@ -318,6 +327,9 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds))
let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur
except CancelledError:
debug "Block proposal waiting was interrupted"
return
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not produced in time", slot = slot, duration = dur
@ -334,6 +346,9 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
error "Unable to proceed attestations", slot = slot,
committee_index = committee_index, duties_count = len(duties)
return
except CancelledError:
debug "Publish attestation request was interrupted"
return
except CatchableError as exc:
error "Unexpected error while producing attestations", slot = slot,
committee_index = committee_index, duties_count = len(duties),
@ -370,24 +385,38 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
proc mainLoop(service: AttestationServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
try:
debug "Service started"
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
let sleepTime =
attestationSlotOffset + vc.beaconClock.durationToNextSlot()
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
service.spawnAttestationTasks(currentSlot)
await sleepAsync(sleepTime)
false
except CancelledError:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[AttestationServiceRef],
vc: ValidatorClientRef): Future[AttestationServiceRef] {.async.} =
debug "Initializing service"
var res = AttestationServiceRef(client: vc, state: ServiceState.Initialized)
let res = AttestationServiceRef(name: "attestation_service",
client: vc, state: ServiceState.Initialized)
return res
proc start*(service: AttestationServiceRef) =

View File

@ -45,6 +45,7 @@ type
proposers*: seq[ValidatorPubKey]
ClientServiceRef* = ref object of RootObj
name*: string
state*: ServiceState
lifeFut*: Future[void]
client*: ValidatorClientRef
@ -52,6 +53,7 @@ type
DutiesServiceRef* = ref object of ClientServiceRef
FallbackServiceRef* = ref object of ClientServiceRef
onlineEvent*: AsyncEvent
ForkServiceRef* = ref object of ClientServiceRef
@ -125,7 +127,9 @@ type
attestationService*: AttestationServiceRef
blockService*: BlockServiceRef
syncCommitteeService*: SyncCommitteeServiceRef
runSlotLoop*: Future[void]
runSlotLoopFut*: Future[void]
sigintHandleFut*: Future[void]
sigtermHandleFut*: Future[void]
beaconClock*: BeaconClock
attachedValidators*: ValidatorPool
forks*: seq[Fork]
@ -143,7 +147,8 @@ type
const
DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
DefaultSyncDutyAndProof* = SyncDutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
DefaultSyncDutyAndProof* =
SyncDutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
@ -168,11 +173,13 @@ chronicles.expandIt(RestAttesterDuty):
validator_committee_index = it.validator_committee_index
proc stop*(csr: ClientServiceRef) {.async.} =
debug "Stopping service", service_name = csr.name
if csr.state == ServiceState.Running:
csr.state = ServiceState.Closing
if not(csr.lifeFut.finished()):
await csr.lifeFut.cancelAndWait()
csr.state = ServiceState.Closed
debug "Service stopped", service_name = csr.name
proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
@ -255,7 +262,7 @@ proc getDurationToNextAttestation*(vc: ValidatorClientRef,
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
let dutySlotTime = duty.data.slot
if duty.data.slot < minSlot:
if (duty.data.slot < minSlot) and (duty.data.slot >= slot):
minSlot = duty.data.slot
if minSlot != FAR_FUTURE_SLOT:
break
@ -273,7 +280,7 @@ proc getDurationToNextBlock*(vc: ValidatorClientRef, slot: Slot): string =
if not(data.isDefault()):
for item in data.duties:
if item.duty.pubkey in vc.attachedValidators:
if item.duty.slot < minSlot:
if (item.duty.slot < minSlot) and (item.duty.slot >= slot):
minSlot = item.duty.slot
if minSlot != FAR_FUTURE_SLOT:
break
@ -336,5 +343,5 @@ proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
break
res
proc getSubcommitteeIndex*(syncCommitteeIndex: IndexInSyncCommittee): SyncSubcommitteeIndex =
SyncSubcommitteeIndex(uint16(syncCommitteeIndex) div SYNC_SUBCOMMITTEE_SIZE)
proc getSubcommitteeIndex*(index: IndexInSyncCommittee): SyncSubcommitteeIndex =
SyncSubcommitteeIndex(uint16(index) div SYNC_SUBCOMMITTEE_SIZE)

View File

@ -54,6 +54,9 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
except ValidatorApiError:
error "Unable to get head state's validator information"
return
except CancelledError:
debug "Validator's indices request was interrupted"
return
except CatchableError as exc:
error "Unexpected error occurred while getting validator information",
err_name = exc.name, err_msg = exc.msg
@ -104,6 +107,9 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
except ValidatorApiError:
error "Unable to get attester duties", epoch = epoch
return 0
except CancelledError:
debug "Attester duties request was interrupted"
return 0
except CatchableError as exc:
error "Unexpected error occured while getting attester duties",
epoch = epoch, err_name = exc.name, err_msg = exc.msg
@ -118,6 +124,8 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
# changed it means that some reorg was happened in beacon node and we
# should re-request all queries again.
offset = 0
duties.setLen(0)
currentRoot = none[Eth2Digest]()
continue
for item in res.data:
@ -129,7 +137,6 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
relevantDuties = duties.filterIt(
checkDuty(it) and (it.pubkey in vc.attachedValidators)
)
dependentRoot = currentRoot.get()
genesisRoot = vc.beaconGenesis.genesis_validators_root
let addOrReplaceItems =
@ -140,16 +147,16 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
let map = vc.attesters.getOrDefault(duty.pubkey)
let epochDuty = map.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(epochDuty.isDefault()):
if epochDuty.dependentRoot != dependentRoot:
if epochDuty.dependentRoot != currentRoot.get():
res.add((epoch, duty))
if not(alreadyWarned):
warn "Attester duties re-organization",
prior_dependent_root = epochDuty.dependentRoot,
dependent_root = dependentRoot
dependent_root = currentRoot.get()
alreadyWarned = true
else:
info "Received new attester duty", duty, epoch = epoch,
dependent_root = dependentRoot
dependent_root = currentRoot.get()
res.add((epoch, duty))
res
@ -175,13 +182,13 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
error "Unable to create slot signature using remote signer",
validator = shortLog(validators[index]),
error_msg = sigRes.error()
DutyAndProof.init(item.epoch, dependentRoot, item.duty,
DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
none[ValidatorSig]())
else:
DutyAndProof.init(item.epoch, dependentRoot, item.duty,
DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
some(sigRes.get()))
else:
DutyAndProof.init(item.epoch, dependentRoot, item.duty,
DutyAndProof.init(item.epoch, currentRoot.get(), item.duty,
none[ValidatorSig]())
var validatorDuties = vc.attesters.getOrDefault(item.duty.pubkey)
@ -209,6 +216,9 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
except ValidatorApiError:
error "Unable to get sync committee duties", epoch = epoch
return 0
except CancelledError:
debug "Request for sync committee duties was interrupted"
return 0
except CatchableError as exc:
error "Unexpected error occurred while getting sync committee duties",
epoch = epoch, err_name = exc.name, err_msg = exc.msg
@ -364,23 +374,29 @@ proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} =
if vc.attachedValidators.count() != 0:
var counts: array[2, tuple[epoch: Epoch, count: int]]
counts[0] = (currentEpoch, await vc.pollForSyncCommitteeDuties(currentEpoch))
counts[1] = (nextEpoch, await vc.pollForSyncCommitteeDuties(nextEpoch))
counts[0] =
(currentEpoch, await vc.pollForSyncCommitteeDuties(currentEpoch))
counts[1] =
(nextEpoch, await vc.pollForSyncCommitteeDuties(nextEpoch))
if (counts[0].count == 0) and (counts[1].count == 0):
debug "No new sync committee member's duties received", slot = currentSlot
debug "No new sync committee member's duties received",
slot = currentSlot
let subscriptions =
block:
var res: seq[RestSyncCommitteeSubscription]
for item in counts:
if item.count > 0:
let subscriptionsInfo = vc.syncMembersSubscriptionInfoForEpoch(item.epoch)
let subscriptionsInfo =
vc.syncMembersSubscriptionInfoForEpoch(item.epoch)
for subInfo in subscriptionsInfo:
let sub = RestSyncCommitteeSubscription(
validator_index: subInfo.validator_index,
sync_committee_indices: subInfo.validator_sync_committee_indices,
until_epoch: (currentEpoch + EPOCHS_PER_SYNC_COMMITTEE_PERIOD -
sync_committee_indices:
subInfo.validator_sync_committee_indices,
until_epoch:
(currentEpoch + EPOCHS_PER_SYNC_COMMITTEE_PERIOD -
currentEpoch.since_sync_committee_period_start()).Epoch
)
res.add(sub)
@ -423,6 +439,8 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
except ValidatorApiError:
debug "Unable to get proposer duties", slot = currentSlot,
epoch = currentEpoch
except CancelledError:
debug "Proposer duties request was interrupted"
except CatchableError as exc:
debug "Unexpected error occured while getting proposer duties",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
@ -480,7 +498,7 @@ template checkAndRestart(serviceLoop: DutiesServiceLoop,
debug "The loop ended unexpectedly with an error",
error_name = error.name, error_msg = error.msg, loop = serviceLoop
elif future.cancelled():
debug "The loop is interrupted unexpectedly", loop = serviceLoop
debug "The loop was interrupted", loop = serviceLoop
else:
debug "The loop is finished unexpectedly without an error",
loop = serviceLoop
@ -488,8 +506,8 @@ template checkAndRestart(serviceLoop: DutiesServiceLoop,
proc mainLoop(service: DutiesServiceRef) {.async.} =
service.state = ServiceState.Running
debug "Service started"
try:
var
fut1 = service.attesterDutiesLoop()
fut2 = service.proposerDutiesLoop()
@ -497,31 +515,38 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
fut4 = service.syncCommitteeeDutiesLoop()
while true:
var breakLoop = false
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
discard await race(fut1, fut2, fut3, fut4)
checkAndRestart(AttesterLoop, fut1, service.attesterDutiesLoop())
checkAndRestart(ProposerLoop, fut2, service.proposerDutiesLoop())
checkAndRestart(IndicesLoop, fut3, service.validatorIndexLoop())
checkAndRestart(SyncCommitteeLoop,
fut4, service.syncCommitteeeDutiesLoop())
false
except CancelledError:
if not(fut1.finished()): fut1.cancel()
if not(fut2.finished()): fut2.cancel()
if not(fut3.finished()): fut3.cancel()
if not(fut4.finished()): fut4.cancel()
await allFutures(fut1, fut2, fut3, fut4)
breakLoop = true
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
checkAndRestart(AttesterLoop, fut1, service.attesterDutiesLoop())
checkAndRestart(ProposerLoop, fut2, service.proposerDutiesLoop())
checkAndRestart(IndicesLoop, fut3, service.validatorIndexLoop())
checkAndRestart(SyncCommitteeLoop, fut4, service.syncCommitteeeDutiesLoop())
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
proc init*(t: typedesc[DutiesServiceRef],
vc: ValidatorClientRef): Future[DutiesServiceRef] {.async.} =
var res = DutiesServiceRef(client: vc, state: ServiceState.Initialized)
let res = DutiesServiceRef(name: "duties_service",
client: vc, state: ServiceState.Initialized)
debug "Initializing service"
# We query for indices first, to avoid empty queries for duties.
await vc.pollForValidatorIndices()

View File

@ -1,54 +1,264 @@
import common, api
import common
logScope: service = "fallback_service"
type
BeaconNodesCounters* = object
online*: int
offline*: int
uninitalized*: int
incompatible*: int
nosync*: int
proc onlineNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] =
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
proc onlineNodesCount*(vc: ValidatorClientRef): int =
vc.beaconNodes.countIt(it.status == RestBeaconNodeStatus.Online)
proc unusableNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] =
vc.beaconNodes.filterIt(it.status != RestBeaconNodeStatus.Online)
proc unusableNodesCount*(vc: ValidatorClientRef): int =
vc.beaconNodes.countIt(it.status != RestBeaconNodeStatus.Online)
proc getNodeCounts*(vc: ValidatorClientRef): BeaconNodesCounters =
var res = BeaconNodesCounters()
for node in vc.beaconNodes:
case node.status
of RestBeaconNodeStatus.Uninitalized:
inc(res.uninitalized)
of RestBeaconNodeStatus.Offline:
inc(res.offline)
of RestBeaconNodeStatus.Incompatible:
inc(res.incompatible)
of RestBeaconNodeStatus.NotSynced:
inc(res.nosync)
of RestBeaconNodeStatus.Online:
inc(res.online)
res
proc waitOnlineNodes*(vc: ValidatorClientRef) {.async.} =
doAssert(not(isNil(vc.fallbackService)))
while true:
if vc.onlineNodesCount() != 0:
break
else:
if vc.fallbackService.onlineEvent.isSet():
vc.fallbackService.onlineEvent.clear()
warn "Connection with beacon node(s) has been lost",
online_nodes = vc.onlineNodesCount(),
unusable_nodes = vc.unusableNodesCount(),
total_nodes = len(vc.beaconNodes)
await vc.fallbackService.onlineEvent.wait()
proc checkCompatible(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
let info =
try:
debug "Requesting beacon node network configuration"
let res = await node.client.getSpecVC()
res.data.data
except CancelledError as exc:
debug "Configuration request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
debug "Unable to obtain beacon node's configuration",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
let genesis =
try:
debug "Requesting beacon node genesis information"
let res = await node.client.getGenesis()
res.data.data
except CancelledError as exc:
debug "Genesis request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
debug "Unable to obtain beacon node's genesis",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
let genesisFlag = (genesis != vc.beaconGenesis)
let configFlag =
# /!\ Keep in sync with `spec/eth2_apis/rest_types.nim` > `RestSpecVC`.
info.MAX_VALIDATORS_PER_COMMITTEE != MAX_VALIDATORS_PER_COMMITTEE or
info.SLOTS_PER_EPOCH != SLOTS_PER_EPOCH or
info.SECONDS_PER_SLOT != SECONDS_PER_SLOT or
info.EPOCHS_PER_ETH1_VOTING_PERIOD != EPOCHS_PER_ETH1_VOTING_PERIOD or
info.SLOTS_PER_HISTORICAL_ROOT != SLOTS_PER_HISTORICAL_ROOT or
info.EPOCHS_PER_HISTORICAL_VECTOR != EPOCHS_PER_HISTORICAL_VECTOR or
info.EPOCHS_PER_SLASHINGS_VECTOR != EPOCHS_PER_SLASHINGS_VECTOR or
info.HISTORICAL_ROOTS_LIMIT != HISTORICAL_ROOTS_LIMIT or
info.VALIDATOR_REGISTRY_LIMIT != VALIDATOR_REGISTRY_LIMIT or
info.MAX_PROPOSER_SLASHINGS != MAX_PROPOSER_SLASHINGS or
info.MAX_ATTESTER_SLASHINGS != MAX_ATTESTER_SLASHINGS or
info.MAX_ATTESTATIONS != MAX_ATTESTATIONS or
info.MAX_DEPOSITS != MAX_DEPOSITS or
info.MAX_VOLUNTARY_EXITS != MAX_VOLUNTARY_EXITS or
info.DOMAIN_BEACON_PROPOSER != DOMAIN_BEACON_PROPOSER or
info.DOMAIN_BEACON_ATTESTER != DOMAIN_BEACON_ATTESTER or
info.DOMAIN_RANDAO != DOMAIN_RANDAO or
info.DOMAIN_DEPOSIT != DOMAIN_DEPOSIT or
info.DOMAIN_VOLUNTARY_EXIT != DOMAIN_VOLUNTARY_EXIT or
info.DOMAIN_SELECTION_PROOF != DOMAIN_SELECTION_PROOF or
info.DOMAIN_AGGREGATE_AND_PROOF != DOMAIN_AGGREGATE_AND_PROOF
if configFlag or genesisFlag:
node.status = RestBeaconNodeStatus.Incompatible
warn "Beacon node has incompatible configuration",
genesis_flag = genesisFlag, config_flag = configFlag
else:
info "Beacon node has compatible configuration"
node.config = some(info)
node.genesis = some(genesis)
node.status = RestBeaconNodeStatus.Online
proc checkSync(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
let syncInfo =
try:
debug "Requesting beacon node sync status"
let res = await node.client.getSyncingStatus()
res.data.data
except CancelledError as exc:
debug "Sync status request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
debug "Unable to obtain beacon node's sync status",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
node.syncInfo = some(syncInfo)
node.status =
if not(syncInfo.is_syncing) or (syncInfo.sync_distance < SYNC_TOLERANCE):
info "Beacon node is in sync", sync_distance = syncInfo.sync_distance,
head_slot = syncInfo.head_slot
RestBeaconNodeStatus.Online
else:
warn "Beacon node not in sync", sync_distance = syncInfo.sync_distance,
head_slot = syncInfo.head_slot
RestBeaconNodeStatus.NotSynced
proc checkOnline(node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
debug "Checking beacon node status"
let agent =
try:
let res = await node.client.getNodeVersion()
res.data.data
except CancelledError as exc:
debug "Status request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
debug "Unable to check beacon node's status",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
info "Beacon node has been identified", agent = agent.version
node.ident = some(agent.version)
node.status = RestBeaconNodeStatus.Online
proc checkNode(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
debug "Checking beacon node", endpoint = node
await node.checkOnline()
if node.status != RestBeaconNodeStatus.Online:
return
await vc.checkCompatible(node)
if node.status != RestBeaconNodeStatus.Online:
return
await vc.checkSync(node)
proc checkNodes*(service: FallbackServiceRef) {.async.} =
let nodesToCheck =
block:
var res: seq[BeaconNodeServerRef]
for item in service.client.beaconNodes:
if item.status != RestBeaconNodeStatus.Online:
res.add(item)
res
let pendingChecks =
block:
var res: seq[Future[void]]
for item in nodesToCheck:
res.add(service.client.checkNode(item))
res
let
nodesToCheck = service.client.unusableNodes()
pendingChecks = nodesToCheck.mapIt(service.client.checkNode(it))
try:
await allFutures(pendingChecks)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
let pending =
block:
var res: seq[Future[void]]
for fut in pendingChecks:
if not(fut.finished()):
pendingCancel.add(fut.cancelAndWait())
await allFutures(pendingCancel)
res.add(fut.cancelAndWait())
res
await allFutures(pending)
raise exc
proc mainLoop(service: FallbackServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
try:
debug "Service started"
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
await service.checkNodes()
# Calculating time we need to sleep until
# `time(next_slot) - SLOT_LOOKAHEAD`
let waitTime =
block:
let nextTime = service.client.beaconClock.durationToNextSlot()
if nextTime < SLOT_LOOKAHEAD:
nextTime + seconds(int64(SECONDS_PER_SLOT))
await sleepAsync(2.seconds)
if service.client.onlineNodesCount() != 0:
service.onlineEvent.fire()
else:
nextTime - SLOT_LOOKAHEAD
await sleepAsync(waitTime)
let counter = vc.getNodeCounts()
warn "No suitable beacon nodes available",
online_nodes = counter.online,
offline_nodes = counter.offline,
uninitalized_nodes = counter.uninitalized,
incompatible_nodes = counter.incompatible,
nonsynced_nodes = counter.nosync,
total_nodes = len(vc.beaconNodes)
false
except CancelledError as exc:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[FallbackServiceRef],
vc: ValidatorClientRef): Future[FallbackServiceRef] {.async.} =
debug "Initializing service"
var res = FallbackServiceRef(client: vc, state: ServiceState.Initialized)
var res = FallbackServiceRef(name: "fallback_service", client: vc,
state: ServiceState.Initialized,
onlineEvent: newAsyncEvent())
# Perform initial nodes check.
await res.checkNodes()
return res

View File

@ -70,21 +70,35 @@ proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
await sleepAsync(sleepTime)
proc mainLoop(service: ForkServiceRef) {.async.} =
service.state = ServiceState.Running
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
try:
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
await vc.pollForFork()
await service.waitForNextEpoch()
false
except CancelledError:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[ForkServiceRef],
vc: ValidatorClientRef): Future[ForkServiceRef] {.async.} =
debug "Initializing service"
var res = ForkServiceRef(client: vc, state: ServiceState.Initialized)
let res = ForkServiceRef(name: "fork_service",
client: vc, state: ServiceState.Initialized)
await vc.pollForFork()
return res

View File

@ -12,6 +12,8 @@ import
../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types
logScope: service = "sync_committee_service"
type
ContributionItem* = object
aggregator_index: uint64
@ -20,16 +22,17 @@ type
subcommitteeIdx: SyncSubcommitteeIndex
proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duty: SyncDutyAndProof): Future[bool] {.async.} =
slot: Slot, beaconBlockRoot: Eth2Digest,
duty: SyncDutyAndProof): Future[bool] {.
async.} =
let
vc = service.client
fork = vc.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
vindex = duty.data.validator_index
subcommitteeIdx = getSubcommitteeIndex(duty.data.validator_sync_committee_index)
subcommitteeIdx = getSubcommitteeIndex(
duty.data.validator_sync_committee_index)
validator =
block:
@ -63,6 +66,9 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
validator = shortLog(validator),
validator_index = vindex
return false
except CancelledError:
debug "Publish sync committee message request was interrupted"
return false
except CatchableError as exc:
error "Unexpected error occurred while publishing sync committee message",
message = shortLog(message),
@ -89,14 +95,16 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncDutyAndProof]) {.async.} =
duties: seq[SyncDutyAndProof]) {.
async.} =
let vc = service.client
let pendingSyncCommitteeMessages =
block:
var res: seq[Future[bool]]
for duty in duties:
debug "Serving sync message duty", duty = duty.data, epoch = slot.epoch()
debug "Serving sync message duty", duty = duty.data,
epoch = slot.epoch()
res.add(service.serveSyncCommitteeMessage(slot,
beaconBlockRoot,
duty))
@ -125,14 +133,16 @@ proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
(succeed, errored, failed)
let delay = vc.getDelay(slot.attestation_deadline())
debug "Sync committee message statistics", total = len(pendingSyncCommitteeMessages),
debug "Sync committee message statistics",
total = len(pendingSyncCommitteeMessages),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
duties_count = len(duties)
proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
proof: ContributionAndProof,
validator: AttachedValidator): Future[bool] {.async.} =
validator: AttachedValidator): Future[bool] {.
async.} =
let
vc = service.client
slot = proof.contribution.slot
@ -169,6 +179,9 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
validator_index = validatorIdx,
err_msg = err.msg
false
except CancelledError:
debug "Publish sync contribution request was interrupted"
return false
except CatchableError as err:
error "Unexpected error occurred while publishing sync contribution",
contribution = shortLog(proof.contribution),
@ -225,9 +238,13 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
error "Unable to get sync message contribution data", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot)
return
except CancelledError:
debug "Request for sync message contribution was interrupted"
return
except CatchableError as exc:
error "Unexpected error occurred while getting sync message contribution",
slot = slot, beaconBlockRoot = shortLog(beaconBlockRoot),
error "Unexpected error occurred while getting sync message "&
"contribution", slot = slot,
beaconBlockRoot = shortLog(beaconBlockRoot),
err_name = exc.name, err_msg = exc.msg
return
@ -262,7 +279,8 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
(succeed, errored, failed)
let delay = vc.getDelay(slot.aggregate_deadline())
debug "Sync message contribution statistics", total = len(pendingAggregates),
debug "Sync message contribution statistics",
total = len(pendingAggregates),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot
@ -271,7 +289,8 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
slot: Slot,
duties: seq[SyncDutyAndProof]) {.async.} =
duties: seq[SyncDutyAndProof]) {.
async.} =
let
vc = service.client
startTime = Moment.now()
@ -281,6 +300,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds))
let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur
except CancelledError:
debug "Block proposal waiting was interrupted"
return
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not produced in time", slot = slot, duration = dur
@ -289,13 +311,21 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
let delay = vc.getDelay(slot.sync_committee_message_deadline())
debug "Producing sync committee messages", delay = delay, slot = slot,
duties_count = len(duties)
let beaconBlockRoot =
block:
try:
let res = await vc.getHeadBlockRoot()
res.root
except ValidatorApiError as exc:
error "Unable to retrieve head block's root to sign", reason = exc.msg
return
except CancelledError:
debug "Block root request was interrupted"
return
except CatchableError as exc:
error "Could not request sync message block root to sign"
error "Unexpected error while requesting sync message block root",
err_name = exc.name, err_msg = exc.msg, slot = slot
return
try:
@ -306,6 +336,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
error "Unable to proceed sync committee messages", slot = slot,
duties_count = len(duties)
return
except CancelledError:
debug "Sync committee producing process was interrupted"
return
except CatchableError as exc:
error "Unexpected error while producing sync committee messages",
slot = slot,
@ -336,8 +369,14 @@ proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) =
proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
try:
debug "Service started"
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
let sleepTime =
syncCommitteeMessageSlotOffset + vc.beaconClock.durationToNextSlot()
@ -346,15 +385,23 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
let currentSlot = sres.get()
service.spawnSyncCommitteeTasks(currentSlot)
await sleepAsync(sleepTime)
false
except CancelledError:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[SyncCommitteeServiceRef],
vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} =
debug "Initializing service"
var res = SyncCommitteeServiceRef(client: vc,
state: ServiceState.Initialized)
let res = SyncCommitteeServiceRef(name: "sync_committee_service",
client: vc, state: ServiceState.Initialized)
return res
proc start*(service: SyncCommitteeServiceRef) =

View File

@ -1220,7 +1220,7 @@ proc pickPasswordAndSaveWallet(rng: var HmacDrbgContext,
block:
let prompt = "Please enter a password: "
let confirm = "Please repeat the password: "
? keyboardCreatePassword(prompt, confirm).mapErr(proc(e: auto): string = $e)
? keyboardCreatePassword(prompt, confirm)
defer: burnMem(password)
var name: WalletName

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 84e32a3b695b2e54ff7733ca660bd95332b21d38
Subproject commit f2e4d447d6aec99b3641d51994650769c5c00d02