VC: Remote BN clock offset monitoring. (#4846)
* Initial commit. * Add algorithm in comment. Remove delays. Fix logging statement issues. Change update from epoch to slot. * Obtain timestamp earlier. * Add processing delays into algorithm. * Fix time offset logging to produce integers instead of strings. * Address review comments. * Fix copyright year. Fix updateStatus(). * Remove fields from Slot start log statement. Fix issues when BN do not support Nimbus Extensions. Rename metric name and type change. * Add beacon role to disable time offset check manually.
This commit is contained in:
parent
bf575aac57
commit
c2c5d80a4f
|
@ -96,6 +96,7 @@ type
|
|||
Table[ValidatorPubKey, SignedValidatorRegistrationV1]
|
||||
dutyValidatorCount*: int
|
||||
## Number of validators that we've checked for activation
|
||||
processingDelay*: Opt[Duration]
|
||||
|
||||
const
|
||||
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
|
||||
|
|
|
@ -1310,6 +1310,8 @@ proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
|
|||
finalizedEpoch = node.dag.finalizedHead.blck.slot.epoch()
|
||||
delay = wallTime - expectedSlot.start_beacon_time()
|
||||
|
||||
node.processingDelay = Opt.some(nanoseconds(delay.nanoseconds))
|
||||
|
||||
info "Slot start",
|
||||
slot = shortLog(wallSlot),
|
||||
epoch = shortLog(wallSlot.epoch),
|
||||
|
|
|
@ -177,6 +177,8 @@ proc runVCSlotLoop(vc: ValidatorClientRef) {.async.} =
|
|||
if checkIfShouldStopAtEpoch(wallSlot, vc.config.stopAtEpoch):
|
||||
return
|
||||
|
||||
vc.processingDelay = Opt.some(nanoseconds(delay.nanoseconds))
|
||||
|
||||
if len(vc.beaconNodes) > 1:
|
||||
let
|
||||
counts = vc.getNodeCounts()
|
||||
|
@ -197,7 +199,8 @@ proc runVCSlotLoop(vc: ValidatorClientRef) {.async.} =
|
|||
blockIn = vc.getDurationToNextBlock(wallSlot),
|
||||
validators = vc.attachedValidators[].count(),
|
||||
good_nodes = goodNodes, viable_nodes = viableNodes,
|
||||
bad_nodes = badNodes, delay = shortLog(delay)
|
||||
bad_nodes = badNodes,
|
||||
delay = shortLog(delay)
|
||||
else:
|
||||
info "Slot start",
|
||||
slot = shortLog(wallSlot),
|
||||
|
|
|
@ -241,3 +241,6 @@ const
|
|||
"The given merkle proof index is invalid"
|
||||
FailedToObtainForkError* =
|
||||
"Failed to obtain fork information"
|
||||
InvalidTimestampValue* =
|
||||
"Invalid or missing timestamp value"
|
||||
|
||||
|
|
|
@ -428,3 +428,27 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
all_peers: allPeers
|
||||
)
|
||||
)
|
||||
|
||||
router.api(MethodPost, "/nimbus/v1/timesync") do (
|
||||
contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||
let
|
||||
timestamp2 = getTimestamp()
|
||||
timestamp1 =
|
||||
block:
|
||||
if contentBody.isNone():
|
||||
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
|
||||
let dres = decodeBody(RestNimbusTimestamp1, contentBody.get())
|
||||
if dres.isErr():
|
||||
return RestApiResponse.jsonError(Http400,
|
||||
InvalidTimestampValue,
|
||||
$dres.error())
|
||||
dres.get().timestamp1
|
||||
let
|
||||
delay = node.processingDelay.valueOr: ZeroDuration
|
||||
response = RestNimbusTimestamp2(
|
||||
timestamp1: timestamp1,
|
||||
timestamp2: timestamp2,
|
||||
timestamp3: getTimestamp(),
|
||||
delay: uint64(delay.nanoseconds)
|
||||
)
|
||||
return RestApiResponse.jsonResponsePlain(response)
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
import std/[options, macros],
|
||||
stew/byteutils, presto,
|
||||
../spec/[forks],
|
||||
../spec/eth2_apis/[rest_types, eth2_rest_serialization],
|
||||
../spec/eth2_apis/[rest_types, eth2_rest_serialization, rest_common],
|
||||
../validators/validator_duties,
|
||||
../consensus_object_pools/blockchain_dag,
|
||||
../beacon_node,
|
||||
|
@ -18,7 +18,7 @@ import std/[options, macros],
|
|||
|
||||
export
|
||||
options, eth2_rest_serialization, blockchain_dag, presto, rest_types,
|
||||
rest_constants
|
||||
rest_constants, rest_common
|
||||
|
||||
type
|
||||
ValidatorIndexError* {.pure.} = enum
|
||||
|
|
|
@ -104,7 +104,8 @@ type
|
|||
capella_mev.SignedBlindedBeaconBlock |
|
||||
SignedValidatorRegistrationV1 |
|
||||
SignedVoluntaryExit |
|
||||
Web3SignerRequest
|
||||
Web3SignerRequest |
|
||||
RestNimbusTimestamp1
|
||||
|
||||
EncodeOctetTypes* =
|
||||
altair.SignedBeaconBlock |
|
||||
|
@ -151,7 +152,9 @@ type
|
|||
GetStateRootResponse |
|
||||
GetBlockRootResponse |
|
||||
SomeForkedLightClientObject |
|
||||
seq[SomeForkedLightClientObject]
|
||||
seq[SomeForkedLightClientObject] |
|
||||
RestNimbusTimestamp1 |
|
||||
RestNimbusTimestamp2
|
||||
|
||||
DecodeConsensysTypes* =
|
||||
ProduceBlockResponseV2 | ProduceBlindedBlockResponse
|
||||
|
|
|
@ -10,6 +10,9 @@ import
|
|||
chronos, presto/client,
|
||||
"."/[rest_types, eth2_rest_serialization]
|
||||
|
||||
from std/times import Time, DateTime, toTime, fromUnix, now, utc, `-`,
|
||||
inNanoseconds
|
||||
|
||||
export chronos, client, rest_types, eth2_rest_serialization
|
||||
|
||||
proc raiseGenericError*(resp: RestPlainResponse) {.
|
||||
|
@ -52,3 +55,6 @@ proc getBodyBytesWithCap*(
|
|||
if not(isNil(reader)):
|
||||
await reader.closeWait()
|
||||
raise newHttpReadError("Could not read response")
|
||||
|
||||
proc getTimestamp*(): uint64 =
|
||||
uint64((toTime(now().utc) - fromUnix(0)).inNanoseconds())
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright (c) 2018-2022 Status Research & Development GmbH
|
||||
# Copyright (c) 2018-2023 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
|
@ -7,11 +7,71 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
import
|
||||
chronos, presto/client,
|
||||
"."/[rest_types, eth2_rest_serialization]
|
||||
chronos, chronicles, presto/client,
|
||||
"."/[rest_types, eth2_rest_serialization, rest_common]
|
||||
|
||||
proc getValidatorsActivity*(epoch: Epoch,
|
||||
body: seq[ValidatorIndex]
|
||||
): RestPlainResponse {.
|
||||
rest, endpoint: "/nimbus/v1/validator/activity/{epoch}",
|
||||
meth: MethodPost.}
|
||||
|
||||
proc getTimesyncInifo*(body: RestNimbusTimestamp1): RestPlainResponse {.
|
||||
rest, endpoint: "/nimbus/v1/timesync", meth: MethodPost.}
|
||||
|
||||
proc getTimeOffset*(client: RestClientRef,
|
||||
delay: Duration): Future[int64] {.async.} =
|
||||
let
|
||||
timestamp1 = getTimestamp()
|
||||
data = RestNimbusTimestamp1(timestamp1: timestamp1)
|
||||
resp = await client.getTimesyncInifo(data)
|
||||
timestamp4 = getTimestamp()
|
||||
|
||||
return
|
||||
case resp.status
|
||||
of 200:
|
||||
if resp.contentType.isNone() or
|
||||
isWildCard(resp.contentType.get().mediaType) or
|
||||
resp.contentType.get().mediaType != ApplicationJsonMediaType:
|
||||
raise newException(RestError, "Missing or incorrect Content-Type")
|
||||
|
||||
let stamps = decodeBytes(RestNimbusTimestamp2, resp.data,
|
||||
resp.contentType).valueOr:
|
||||
raise newException(RestError, $error)
|
||||
|
||||
trace "Time offset data",
|
||||
timestamp1 = timestamp1,
|
||||
timestamp2 = stamps.timestamp2,
|
||||
timestamp3 = stamps.timestamp3,
|
||||
timestamp4 = timestamp4,
|
||||
delay14 = delay.nanoseconds,
|
||||
delay23 = stamps.delay
|
||||
|
||||
# t1 - time when we sent request.
|
||||
# t2 - time when remote server received request.
|
||||
# t3 - time when remote server sent response.
|
||||
# t4 - time when we received response.
|
||||
# delay14 = validator client processing delay.
|
||||
# delay23 = beacon node processing delay.
|
||||
#
|
||||
# Round-trip network delay `delta` = (t4 - t1) - (t3 - t2)
|
||||
# but with delays this will be:
|
||||
# `delta` = (t4 - t1 + delay14) - (t3 - t2 + delay23)
|
||||
# Estimated server time is t3 + (delta div 2)
|
||||
# Estimated clock skew `theta` = t3 + (delta div 2) - t4
|
||||
let
|
||||
delay14 = delay.nanoseconds
|
||||
delay23 = int64(stamps.delay)
|
||||
offset = (int64(stamps.timestamp2) - int64(timestamp1) +
|
||||
int64(stamps.timestamp3) - int64(timestamp4) +
|
||||
delay14 - delay23) div 2
|
||||
offset
|
||||
else:
|
||||
let error = decodeBytes(RestErrorMessage, resp.data,
|
||||
resp.contentType).valueOr:
|
||||
let msg = "Incorrect response error format (" & $resp.status &
|
||||
") [" & $error & "]"
|
||||
raise (ref RestResponseError)(msg: msg, status: resp.status)
|
||||
let msg = "Error response (" & $resp.status & ") [" & error.message & "]"
|
||||
raise (ref RestResponseError)(
|
||||
msg: msg, status: error.code, message: error.message)
|
||||
|
|
|
@ -657,6 +657,15 @@ type
|
|||
RestRoot* = object
|
||||
root*: Eth2Digest
|
||||
|
||||
RestNimbusTimestamp1* = object
|
||||
timestamp1*: uint64
|
||||
|
||||
RestNimbusTimestamp2* = object
|
||||
timestamp1*: uint64
|
||||
timestamp2*: uint64
|
||||
timestamp3*: uint64
|
||||
delay*: uint64
|
||||
|
||||
# Types based on the OAPI yaml file - used in responses to requests
|
||||
GetBeaconHeadResponse* = DataEnclosedObject[Slot]
|
||||
GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation]
|
||||
|
|
|
@ -11,7 +11,7 @@ import
|
|||
bearssl/rand, chronos, presto, presto/client as presto_client,
|
||||
chronicles, confutils,
|
||||
metrics, metrics/chronos_httpserver,
|
||||
".."/spec/datatypes/[phase0, altair],
|
||||
".."/spec/datatypes/[base, phase0, altair],
|
||||
".."/spec/[eth2_merkleization, helpers, signatures, validator],
|
||||
".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client,
|
||||
dynamic_fee_recipients],
|
||||
|
@ -28,7 +28,8 @@ export
|
|||
byteutils, presto_client, eth2_rest_serialization, rest_beacon_client,
|
||||
phase0, altair, helpers, signatures, validator, eth2_merkleization,
|
||||
beacon_clock, keystore_management, slashing_protection, validator_pool,
|
||||
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime, block_pools_types
|
||||
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime, block_pools_types,
|
||||
base, metrics
|
||||
|
||||
const
|
||||
SYNC_TOLERANCE* = 4'u64
|
||||
|
@ -106,7 +107,14 @@ type
|
|||
AttestationData, AttestationPublish,
|
||||
AggregatedData, AggregatedPublish,
|
||||
BlockProposalData, BlockProposalPublish,
|
||||
SyncCommitteeData, SyncCommitteePublish
|
||||
SyncCommitteeData, SyncCommitteePublish,
|
||||
NoTimeCheck
|
||||
|
||||
RestBeaconNodeFeature* {.pure.} = enum
|
||||
NoNimbusExtensions ## BN do not supports Nimbus Extensions
|
||||
|
||||
TimeOffset* = object
|
||||
value: int64
|
||||
|
||||
BeaconNodeServer* = object
|
||||
client*: RestClientRef
|
||||
|
@ -116,9 +124,11 @@ type
|
|||
genesis*: Opt[RestGenesis]
|
||||
syncInfo*: Opt[RestSyncInfo]
|
||||
status*: RestBeaconNodeStatus
|
||||
features*: set[RestBeaconNodeFeature]
|
||||
roles*: set[BeaconNodeRole]
|
||||
logIdent*: string
|
||||
index*: int
|
||||
timeOffset*: Opt[TimeOffset]
|
||||
|
||||
EpochDuties* = object
|
||||
duties*: Table[Epoch, DutyAndProof]
|
||||
|
@ -136,6 +146,7 @@ type
|
|||
Synced, ## BN and EL are synced.
|
||||
UnexpectedCode, ## BN sends unexpected/incorrect HTTP status code .
|
||||
UnexpectedResponse, ## BN sends unexpected/incorrect response.
|
||||
BrokenClock, ## BN wall clock is broken or has significan offset.
|
||||
InternalError ## BN reports internal error.
|
||||
|
||||
BeaconNodesCounters* = object
|
||||
|
@ -197,6 +208,7 @@ type
|
|||
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
|
||||
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
|
||||
blocksSeen*: Table[Slot, BlockDataItem]
|
||||
processingDelay*: Opt[Duration]
|
||||
rng*: ref HmacDrbgContext
|
||||
|
||||
ApiStrategyKind* {.pure.} = enum
|
||||
|
@ -233,8 +245,11 @@ const
|
|||
BeaconNodeRole.BlockProposalData,
|
||||
BeaconNodeRole.BlockProposalPublish,
|
||||
BeaconNodeRole.SyncCommitteeData,
|
||||
BeaconNodeRole.SyncCommitteePublish,
|
||||
BeaconNodeRole.SyncCommitteePublish
|
||||
}
|
||||
## AllBeaconNodeRoles missing BeaconNodeRole.NoTimeCheck, because timecheks
|
||||
## are enabled by default.
|
||||
|
||||
AllBeaconNodeStatuses* = {
|
||||
RestBeaconNodeStatus.Offline,
|
||||
RestBeaconNodeStatus.Online,
|
||||
|
@ -245,9 +260,22 @@ const
|
|||
RestBeaconNodeStatus.Synced,
|
||||
RestBeaconNodeStatus.UnexpectedCode,
|
||||
RestBeaconNodeStatus.UnexpectedResponse,
|
||||
RestBeaconNodeStatus.BrokenClock,
|
||||
RestBeaconNodeStatus.InternalError
|
||||
}
|
||||
|
||||
proc `$`*(to: TimeOffset): string =
|
||||
if to.value < 0:
|
||||
"-" & $chronos.nanoseconds(-to.value)
|
||||
else:
|
||||
$chronos.nanoseconds(to.value)
|
||||
|
||||
chronicles.formatIt(TimeOffset):
|
||||
$it
|
||||
|
||||
chronicles.formatIt(Opt[TimeOffset]):
|
||||
if it.isSome(): $(it.get()) else: "<unknown>"
|
||||
|
||||
proc `$`*(roles: set[BeaconNodeRole]): string =
|
||||
if card(roles) > 0:
|
||||
if roles != AllBeaconNodeRoles:
|
||||
|
@ -270,6 +298,8 @@ proc `$`*(roles: set[BeaconNodeRole]): string =
|
|||
res.add("sync-data")
|
||||
if BeaconNodeRole.SyncCommitteePublish in roles:
|
||||
res.add("sync-publish")
|
||||
if BeaconNodeRole.NoTimeCheck in roles:
|
||||
res.add("no-timecheck")
|
||||
res.join(",")
|
||||
else:
|
||||
"{all}"
|
||||
|
@ -288,6 +318,7 @@ proc `$`*(status: RestBeaconNodeStatus): string =
|
|||
of RestBeaconNodeStatus.UnexpectedCode: "unexpected code"
|
||||
of RestBeaconNodeStatus.UnexpectedResponse: "unexpected data"
|
||||
of RestBeaconNodeStatus.InternalError: "internal error"
|
||||
of RestBeaconNodeStatus.BrokenClock: "broken clock"
|
||||
|
||||
proc `$`*(failure: ApiFailure): string =
|
||||
case failure
|
||||
|
@ -356,7 +387,7 @@ proc getFailureReason*(exc: ref ValidatorApiError): string =
|
|||
exc.msg
|
||||
|
||||
proc shortLog*(roles: set[BeaconNodeRole]): string =
|
||||
var r = "AGBSD"
|
||||
var r = "AGBSDT"
|
||||
if BeaconNodeRole.AttestationData in roles:
|
||||
if BeaconNodeRole.AttestationPublish in roles: r[0] = 'A' else: r[0] = 'a'
|
||||
else:
|
||||
|
@ -376,6 +407,7 @@ proc shortLog*(roles: set[BeaconNodeRole]): string =
|
|||
if BeaconNodeRole.SyncCommitteePublish in roles:
|
||||
r[3] = '+' else: r[3] = '-'
|
||||
if BeaconNodeRole.Duties in roles: r[4] = 'D' else: r[4] = '-'
|
||||
if BeaconNodeRole.NoTimeCheck notin roles: r[5] = 'T' else: r[5] = '-'
|
||||
r
|
||||
|
||||
proc `$`*(bn: BeaconNodeServerRef): string =
|
||||
|
@ -513,6 +545,10 @@ proc updateStatus*(node: BeaconNodeServerRef,
|
|||
warn "Beacon node reports internal error",
|
||||
reason = failure.getFailureReason()
|
||||
node.status = status
|
||||
of RestBeaconNodeStatus.BrokenClock:
|
||||
if node.status != status:
|
||||
warn "Beacon node's clock is out of order, (beacon node is unusable)"
|
||||
node.status = status
|
||||
|
||||
proc stop*(csr: ClientServiceRef) {.async.} =
|
||||
debug "Stopping service", service = csr.name
|
||||
|
@ -573,8 +609,12 @@ proc parseRoles*(data: string): Result[set[BeaconNodeRole], cstring] =
|
|||
res.incl(BeaconNodeRole.SyncCommitteePublish)
|
||||
of "duties":
|
||||
res.incl(BeaconNodeRole.Duties)
|
||||
of "no-timecheck":
|
||||
res.incl(BeaconNodeRole.NoTimeCheck)
|
||||
else:
|
||||
return err("Invalid beacon node role string found")
|
||||
if res == {BeaconNodeRole.NoTimeCheck}:
|
||||
res.incl(AllBeaconNodeRoles)
|
||||
ok(res)
|
||||
|
||||
proc normalizeUri*(r: Uri): Result[Uri, cstring] =
|
||||
|
@ -613,18 +653,14 @@ proc init*(t: typedesc[BeaconNodeServerRef], remote: Uri,
|
|||
doAssert(index >= 0)
|
||||
let
|
||||
flags = {RestClientFlag.CommaSeparatedArray}
|
||||
socketFlags = {SocketFlags.TcpNoDelay}
|
||||
remoteUri = normalizeUri(remote).valueOr:
|
||||
return err("Invalid URL: " & $error)
|
||||
client =
|
||||
block:
|
||||
let res = RestClientRef.new($remoteUri, flags = flags)
|
||||
if res.isErr(): return err($res.error())
|
||||
res.get()
|
||||
roles =
|
||||
block:
|
||||
let res = parseRoles(remoteUri.anchor)
|
||||
if res.isErr(): return err($res.error())
|
||||
res.get()
|
||||
return err($error)
|
||||
client = RestClientRef.new($remoteUri, flags = flags,
|
||||
socketFlags = socketFlags).valueOr:
|
||||
return err($error)
|
||||
roles = parseRoles(remoteUri.anchor).valueOr:
|
||||
return err($error)
|
||||
|
||||
let server = BeaconNodeServerRef(
|
||||
client: client, endpoint: $remoteUri, index: index, roles: roles,
|
||||
|
@ -1235,3 +1271,41 @@ proc waitForBlock*(
|
|||
iterator chunks*[T](data: openArray[T], maxCount: Positive): seq[T] =
|
||||
for i in countup(0, len(data) - 1, maxCount):
|
||||
yield @(data.toOpenArray(i, min(i + maxCount, len(data)) - 1))
|
||||
|
||||
func init*(t: typedesc[TimeOffset], duration: Duration): TimeOffset =
|
||||
TimeOffset(value: duration.nanoseconds)
|
||||
|
||||
func init*(t: typedesc[TimeOffset], offset: int64): TimeOffset =
|
||||
TimeOffset(value: offset)
|
||||
|
||||
func abs*(to: TimeOffset): TimeOffset =
|
||||
TimeOffset(value: abs(to.value))
|
||||
|
||||
func milliseconds*(to: TimeOffset): int64 =
|
||||
if to.value < 0:
|
||||
-nanoseconds(-to.value).milliseconds
|
||||
else:
|
||||
nanoseconds(-to.value).milliseconds
|
||||
|
||||
func `<`*(a, b: TimeOffset): bool = a.value < b.value
|
||||
func `<=`*(a, b: TimeOffset): bool = a.value <= b.value
|
||||
func `==`*(a, b: TimeOffset): bool = a.value == b.value
|
||||
|
||||
func nanoseconds*(to: TimeOffset): int64 = to.value
|
||||
|
||||
proc waitForNextEpoch*(service: ClientServiceRef,
|
||||
delay: Duration) {.async.} =
|
||||
let
|
||||
vc = service.client
|
||||
sleepTime = vc.beaconClock.durationToNextEpoch() + delay
|
||||
debug "Sleeping until next epoch", service = service.name,
|
||||
sleep_time = sleepTime, delay = delay
|
||||
await sleepAsync(sleepTime)
|
||||
|
||||
proc waitForNextEpoch*(service: ClientServiceRef): Future[void] =
|
||||
waitForNextEpoch(service, ZeroDuration)
|
||||
|
||||
proc waitForNextSlot*(service: ClientServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
let sleepTime = vc.beaconClock.durationToNextSlot()
|
||||
await sleepAsync(sleepTime)
|
||||
|
|
|
@ -21,12 +21,6 @@ proc getCheckingList*(vc: ValidatorClientRef, epoch: Epoch): seq[ValidatorIndex]
|
|||
res.add validator.index.get()
|
||||
res
|
||||
|
||||
proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
|
||||
debug "Sleeping until next epoch", sleep_time = sleepTime
|
||||
await sleepAsync(sleepTime)
|
||||
|
||||
proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
|
||||
activities: GetValidatorsLivenessResponse) =
|
||||
let vc = service.client
|
||||
|
@ -79,12 +73,12 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
|
|||
# before that, we can safely perform the check for epoch 0 and thus keep
|
||||
# validating in epoch 1
|
||||
if vc.beaconClock.now().slotOrZero() > GENESIS_SLOT:
|
||||
await service.waitForNextEpoch()
|
||||
await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT)
|
||||
|
||||
while try:
|
||||
# Wait for the epoch to end - at the end (or really, the beginning of the
|
||||
# next one, we ask what happened
|
||||
await service.waitForNextEpoch()
|
||||
await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT)
|
||||
let
|
||||
currentEpoch = vc.currentSlot().epoch()
|
||||
previousEpoch =
|
||||
|
|
|
@ -10,6 +10,23 @@ import common
|
|||
const
|
||||
ServiceName = "fallback_service"
|
||||
|
||||
FAIL_TIME_OFFSETS = [
|
||||
TimeOffset.init(-(MAXIMUM_GOSSIP_CLOCK_DISPARITY.nanoseconds)),
|
||||
TimeOffset.init(MAXIMUM_GOSSIP_CLOCK_DISPARITY.nanoseconds * 4)
|
||||
]
|
||||
WARN_TIME_OFFSETS = [
|
||||
TimeOffset.init(-(MAXIMUM_GOSSIP_CLOCK_DISPARITY.nanoseconds div 2)),
|
||||
TimeOffset.init(MAXIMUM_GOSSIP_CLOCK_DISPARITY.nanoseconds * 2),
|
||||
]
|
||||
NOTE_TIME_OFFSETS = [
|
||||
TimeOffset.init(-(MAXIMUM_GOSSIP_CLOCK_DISPARITY.nanoseconds div 4)),
|
||||
TimeOffset.init(MAXIMUM_GOSSIP_CLOCK_DISPARITY.nanoseconds),
|
||||
]
|
||||
|
||||
declareGauge validator_client_time_offset,
|
||||
"Wall clock offset(s) between validator client and beacon node(s)",
|
||||
labels = ["node"]
|
||||
|
||||
logScope: service = ServiceName
|
||||
|
||||
proc nodesCount*(vc: ValidatorClientRef,
|
||||
|
@ -280,15 +297,142 @@ proc checkNodes*(service: FallbackServiceRef): Future[bool] {.async.} =
|
|||
raise exc
|
||||
return res
|
||||
|
||||
proc checkOffsetStatus(node: BeaconNodeServerRef, offset: TimeOffset) =
|
||||
logScope:
|
||||
node = node
|
||||
|
||||
node.timeOffset = Opt.some(offset)
|
||||
validator_client_time_offset.set(float64(offset.milliseconds()), @[$node])
|
||||
|
||||
debug "Beacon node time offset", time_offset = offset
|
||||
|
||||
let updateStatus =
|
||||
if (offset <= WARN_TIME_OFFSETS[0]) or (offset >= WARN_TIME_OFFSETS[1]):
|
||||
warn "Beacon node has significant time offset",
|
||||
time_offset = offset
|
||||
if (offset <= FAIL_TIME_OFFSETS[0]) or (offset >= FAIL_TIME_OFFSETS[1]):
|
||||
# Beacon node's clock is out of acceptable offsets, we marking this
|
||||
# beacon node and remote it from the list of working nodes.
|
||||
warn "Beacon node has enormous time offset",
|
||||
time_offset = offset
|
||||
let failure = ApiNodeFailure.init(ApiFailure.NoError,
|
||||
"checkTimeOffsetStatus()", node, 200,
|
||||
"Beacon node has enormous time offset")
|
||||
node.updateStatus(RestBeaconNodeStatus.BrokenClock, failure)
|
||||
false
|
||||
else:
|
||||
true
|
||||
elif (offset <= NOTE_TIME_OFFSETS[0]) or (offset >= NOTE_TIME_OFFSETS[1]):
|
||||
info "Beacon node has notable time offset",
|
||||
time_offset = offset
|
||||
true
|
||||
else:
|
||||
true
|
||||
|
||||
if updateStatus:
|
||||
if node.status == RestBeaconNodeStatus.BrokenClock:
|
||||
# Beacon node's clock has been recovered to some acceptable offset, so we
|
||||
# could restore beacon node.
|
||||
let failure = ApiNodeFailure.init(ApiFailure.NoError,
|
||||
"checkTimeOffsetStatus()", node, 200,
|
||||
"Beacon node has acceptable time offset")
|
||||
node.updateStatus(RestBeaconNodeStatus.Offline, failure)
|
||||
|
||||
proc runTimeMonitor(service: FallbackServiceRef,
|
||||
node: BeaconNodeServerRef) {.async.} =
|
||||
const NimbusExtensionsLog = "Beacon node do not support nimbus extensions"
|
||||
let
|
||||
vc = service.client
|
||||
roles = AllBeaconNodeRoles
|
||||
statuses = AllBeaconNodeStatuses - {RestBeaconNodeStatus.Offline}
|
||||
|
||||
logScope:
|
||||
node = node
|
||||
|
||||
if BeaconNodeRole.NoTimeCheck in node.roles:
|
||||
debug "Beacon node time offset checks disabled"
|
||||
return
|
||||
|
||||
while true:
|
||||
while node.status notin statuses:
|
||||
await vc.waitNodes(nil, statuses, roles, true)
|
||||
|
||||
if RestBeaconNodeFeature.NoNimbusExtensions in node.features:
|
||||
return
|
||||
|
||||
let tres =
|
||||
try:
|
||||
let
|
||||
delay = vc.processingDelay.valueOr: ZeroDuration
|
||||
res = await node.client.getTimeOffset(delay)
|
||||
Opt.some(res)
|
||||
except RestResponseError as exc:
|
||||
case exc.status
|
||||
of 400:
|
||||
debug "Beacon node returns invalid response",
|
||||
status = $exc.status, reason = $exc.msg,
|
||||
error_message = $exc.message
|
||||
else:
|
||||
notice NimbusExtensionsLog, status = $exc.status,
|
||||
reason = $exc.msg, error_message = $exc.message
|
||||
# Exiting loop
|
||||
node.features.incl(RestBeaconNodeFeature.NoNimbusExtensions)
|
||||
return
|
||||
except RestError as exc:
|
||||
debug "Unable to obtain beacon node's time offset", reason = $exc.msg
|
||||
notice NimbusExtensionsLog
|
||||
node.features.incl(RestBeaconNodeFeature.NoNimbusExtensions)
|
||||
return
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "An unexpected error occurred while asking for time offset",
|
||||
reason = $exc.msg, error = $exc.name
|
||||
notice NimbusExtensionsLog
|
||||
node.features.incl(RestBeaconNodeFeature.NoNimbusExtensions)
|
||||
return
|
||||
|
||||
if tres.isSome():
|
||||
checkOffsetStatus(node, TimeOffset.init(tres.get()))
|
||||
else:
|
||||
debug "Beacon node's time offset was not updated"
|
||||
|
||||
await service.waitForNextSlot()
|
||||
|
||||
proc processTimeMonitoring(service: FallbackServiceRef) {.async.} =
|
||||
let
|
||||
vc = service.client
|
||||
blockNodes = vc.filterNodes(AllBeaconNodeStatuses, AllBeaconNodeRoles)
|
||||
|
||||
var pendingChecks: seq[Future[void]]
|
||||
|
||||
try:
|
||||
for node in blockNodes:
|
||||
pendingChecks.add(service.runTimeMonitor(node))
|
||||
await allFutures(pendingChecks)
|
||||
except CancelledError as exc:
|
||||
var pending: seq[Future[void]]
|
||||
for future in pendingChecks:
|
||||
if not(future.finished()): pending.add(future.cancelAndWait())
|
||||
await allFutures(pending)
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
warn "An unexpected error occurred while running time monitoring",
|
||||
reason = $exc.msg, error = $exc.name
|
||||
return
|
||||
|
||||
proc mainLoop(service: FallbackServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
service.state = ServiceState.Running
|
||||
debug "Service started"
|
||||
|
||||
let timeMonitorFut = processTimeMonitoring(service)
|
||||
|
||||
try:
|
||||
await vc.preGenesisEvent.wait()
|
||||
except CancelledError:
|
||||
debug "Service interrupted"
|
||||
if not(timeMonitorFut.finished()): await timeMonitorFut.cancelAndWait()
|
||||
return
|
||||
except CatchableError as exc:
|
||||
warn "Service crashed with unexpected error", err_name = exc.name,
|
||||
|
@ -306,6 +450,7 @@ proc mainLoop(service: FallbackServiceRef) {.async.} =
|
|||
false
|
||||
except CancelledError as exc:
|
||||
debug "Service interrupted"
|
||||
if not(timeMonitorFut.finished()): await timeMonitorFut.cancelAndWait()
|
||||
true
|
||||
except CatchableError as exc:
|
||||
error "Service crashed with unexpected error", err_name = exc.name,
|
||||
|
|
|
@ -71,12 +71,6 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} =
|
|||
notice "Fork schedule updated", fork_schedule = sortedForks
|
||||
vc.forksAvailable.fire()
|
||||
|
||||
proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
|
||||
debug "Sleeping until next epoch", sleep_time = sleepTime
|
||||
await sleepAsync(sleepTime)
|
||||
|
||||
proc mainLoop(service: ForkServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
service.state = ServiceState.Running
|
||||
|
@ -99,7 +93,7 @@ proc mainLoop(service: ForkServiceRef) {.async.} =
|
|||
let breakLoop =
|
||||
try:
|
||||
await vc.pollForFork()
|
||||
await service.waitForNextEpoch()
|
||||
await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT)
|
||||
false
|
||||
except CancelledError:
|
||||
debug "Service interrupted"
|
||||
|
|
Loading…
Reference in New Issue