Time offset monitoring.

This commit is contained in:
cheatfate 2023-04-13 13:46:21 +03:00
parent 9266fe20af
commit 317188e3e7
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
11 changed files with 351 additions and 119 deletions

View File

@ -234,3 +234,5 @@ const
"The given merkle proof is invalid" "The given merkle proof is invalid"
InvalidMerkleProofIndexError* = InvalidMerkleProofIndexError* =
"The given merkle proof index is invalid" "The given merkle proof index is invalid"
InvalidTimestampValue* =
"Invalid or missing timestamp value"

View File

@ -427,3 +427,27 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) =
all_peers: allPeers all_peers: allPeers
) )
) )
router.api(MethodPost, "/nimbus/v1/timesync") do (
contentBody: Option[ContentBody]) -> RestApiResponse:
let
timestamp2 = getTimestamp()
timestamp1 =
block:
if contentBody.isNone():
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
let dres = decodeBody(RestNimbusTimestamp1, contentBody.get())
if dres.isErr():
return RestApiResponse.jsonError(Http400,
InvalidTimestampValue,
$dres.error())
dres.get().timestamp1
# We need some gap here, otherwise `timestamp2` would be equal to
# `timestamp3`.
await sleepAsync(50.milliseconds)
let response = RestNimbusTimestamp2(
timestamp1: timestamp1,
timestamp2: timestamp2,
timestamp3: getTimestamp()
)
return RestApiResponse.jsonResponsePlain(response)

View File

@ -10,7 +10,7 @@
import std/[options, macros], import std/[options, macros],
stew/byteutils, presto, stew/byteutils, presto,
../spec/[forks], ../spec/[forks],
../spec/eth2_apis/[rest_types, eth2_rest_serialization], ../spec/eth2_apis/[rest_types, eth2_rest_serialization, rest_common],
../validators/validator_duties, ../validators/validator_duties,
../consensus_object_pools/blockchain_dag, ../consensus_object_pools/blockchain_dag,
../beacon_node, ../beacon_node,
@ -18,7 +18,7 @@ import std/[options, macros],
export export
options, eth2_rest_serialization, blockchain_dag, presto, rest_types, options, eth2_rest_serialization, blockchain_dag, presto, rest_types,
rest_constants rest_constants, rest_common
type type
ValidatorIndexError* {.pure.} = enum ValidatorIndexError* {.pure.} = enum

View File

@ -104,7 +104,8 @@ type
capella_mev.SignedBlindedBeaconBlock | capella_mev.SignedBlindedBeaconBlock |
SignedValidatorRegistrationV1 | SignedValidatorRegistrationV1 |
SignedVoluntaryExit | SignedVoluntaryExit |
Web3SignerRequest Web3SignerRequest |
RestNimbusTimestamp1
EncodeOctetTypes* = EncodeOctetTypes* =
altair.SignedBeaconBlock | altair.SignedBeaconBlock |
@ -152,7 +153,9 @@ type
GetStateRootResponse | GetStateRootResponse |
GetBlockRootResponse | GetBlockRootResponse |
SomeForkedLightClientObject | SomeForkedLightClientObject |
seq[SomeForkedLightClientObject] seq[SomeForkedLightClientObject] |
RestNimbusTimestamp1 |
RestNimbusTimestamp2
RestVersioned*[T] = object RestVersioned*[T] = object
data*: T data*: T

View File

@ -10,6 +10,9 @@ import
chronos, presto/client, chronos, presto/client,
"."/[rest_types, eth2_rest_serialization] "."/[rest_types, eth2_rest_serialization]
from std/times import Time, DateTime, toTime, fromUnix, now, utc, `-`,
inNanoseconds
export chronos, client, rest_types, eth2_rest_serialization export chronos, client, rest_types, eth2_rest_serialization
proc raiseGenericError*(resp: RestPlainResponse) {. proc raiseGenericError*(resp: RestPlainResponse) {.
@ -52,3 +55,6 @@ proc getBodyBytesWithCap*(
if not(isNil(reader)): if not(isNil(reader)):
await reader.closeWait() await reader.closeWait()
raise newHttpReadError("Could not read response") raise newHttpReadError("Could not read response")
proc getTimestamp*(): uint64 =
uint64((toTime(now().utc) - fromUnix(0)).inNanoseconds())

View File

@ -8,10 +8,46 @@
import import
chronos, presto/client, chronos, presto/client,
"."/[rest_types, eth2_rest_serialization] "."/[rest_types, eth2_rest_serialization, rest_common]
proc getValidatorsActivity*(epoch: Epoch, proc getValidatorsActivity*(epoch: Epoch,
body: seq[ValidatorIndex] body: seq[ValidatorIndex]
): RestPlainResponse {. ): RestPlainResponse {.
rest, endpoint: "/nimbus/v1/validator/activity/{epoch}", rest, endpoint: "/nimbus/v1/validator/activity/{epoch}",
meth: MethodPost.} meth: MethodPost.}
proc getTimesyncInifo*(body: RestNimbusTimestamp1): RestPlainResponse {.
rest, endpoint: "/nimbus/v1/timesync", meth: MethodPost.}
proc getTimeOffset*(client: RestClientRef): Future[int64] {.async.} =
let
timestamp1 = getTimestamp()
data = RestNimbusTimestamp1(timestamp1: timestamp1)
resp = await client.getTimesyncInifo(data)
return
case resp.status
of 200:
if resp.contentType.isNone() or
isWildCard(resp.contentType.get().mediaType) or
resp.contentType.get().mediaType != ApplicationJsonMediaType:
raise newException(RestError, "Missing or incorrect Content-Type")
let stamps = decodeBytes(RestNimbusTimestamp2, resp.data,
resp.contentType).valueOr:
raise newException(RestError, $error)
let offset = (int64(stamps.timestamp2) - int64(timestamp1)) +
(int64(stamps.timestamp3) - int64(getTimestamp()))
offset
of 400:
let error = decodeBytes(RestErrorMessage, resp.data,
resp.contentType).valueOr:
let msg = "Incorrect response error format (" & $resp.status &
") [" & $error & "]"
raise (ref RestResponseError)(msg: msg, status: resp.status)
let msg = "Error response (" & $resp.status & ") [" & error.message & "]"
raise (ref RestResponseError)(
msg: msg, status: error.code, message: error.message)
else:
raiseRestResponseError(resp)

View File

@ -625,6 +625,14 @@ type
RestRoot* = object RestRoot* = object
root*: Eth2Digest root*: Eth2Digest
RestNimbusTimestamp1* = object
timestamp1*: uint64
RestNimbusTimestamp2* = object
timestamp1*: uint64
timestamp2*: uint64
timestamp3*: uint64
# Types based on the OAPI yaml file - used in responses to requests # Types based on the OAPI yaml file - used in responses to requests
GetBeaconHeadResponse* = DataEnclosedObject[Slot] GetBeaconHeadResponse* = DataEnclosedObject[Slot]
GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation] GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation]

View File

@ -1153,3 +1153,11 @@ proc waitForBlock*(
let dur = Moment.now() - startTime let dur = Moment.now() - startTime
debug "Waiting for block cutoff was interrupted", duration = dur debug "Waiting for block cutoff was interrupted", duration = dur
raise exc raise exc
proc waitForNextEpoch*(service: ClientServiceRef,
delay: Duration) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextEpoch() + delay
debug "Sleeping until next epoch", service = service.name,
sleep_time = sleepTime, delay = delay
await sleepAsync(sleepTime)

View File

@ -21,12 +21,6 @@ proc getCheckingList*(vc: ValidatorClientRef, epoch: Epoch): seq[ValidatorIndex]
res.add validator.index.get() res.add validator.index.get()
res res
proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
debug "Sleeping until next epoch", sleep_time = sleepTime
await sleepAsync(sleepTime)
proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch, proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
activities: GetValidatorsLivenessResponse) = activities: GetValidatorsLivenessResponse) =
let vc = service.client let vc = service.client
@ -79,12 +73,12 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
# before that, we can safely perform the check for epoch 0 and thus keep # before that, we can safely perform the check for epoch 0 and thus keep
# validating in epoch 1 # validating in epoch 1
if vc.beaconClock.now().slotOrZero() > GENESIS_SLOT: if vc.beaconClock.now().slotOrZero() > GENESIS_SLOT:
await service.waitForNextEpoch() await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT)
while try: while try:
# Wait for the epoch to end - at the end (or really, the beginning of the # Wait for the epoch to end - at the end (or really, the beginning of the
# next one, we ask what happened # next one, we ask what happened
await service.waitForNextEpoch() await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT)
let let
currentEpoch = vc.currentSlot().epoch() currentEpoch = vc.currentSlot().epoch()
previousEpoch = previousEpoch =

View File

@ -71,12 +71,6 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} =
notice "Fork schedule updated", fork_schedule = sortedForks notice "Fork schedule updated", fork_schedule = sortedForks
vc.forksAvailable.fire() vc.forksAvailable.fire()
proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
debug "Sleeping until next epoch", sleep_time = sleepTime
await sleepAsync(sleepTime)
proc mainLoop(service: ForkServiceRef) {.async.} = proc mainLoop(service: ForkServiceRef) {.async.} =
let vc = service.client let vc = service.client
service.state = ServiceState.Running service.state = ServiceState.Running
@ -99,7 +93,7 @@ proc mainLoop(service: ForkServiceRef) {.async.} =
let breakLoop = let breakLoop =
try: try:
await vc.pollForFork() await vc.pollForFork()
await service.waitForNextEpoch() await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT)
false false
except CancelledError: except CancelledError:
debug "Service interrupted" debug "Service interrupted"

View File

@ -10,84 +10,34 @@ from fallback_service import waitNodes, filterNodes
const const
ServiceName = "monitor_service" ServiceName = "monitor_service"
WARNING_TIME_OFFSET = 2.seconds
NOTICE_TIME_OFFSET = 1.seconds
DEBUG_TIME_OFFSET = 500.milliseconds
logScope: service = ServiceName logScope: service = ServiceName
proc blocksLoop(service: MonitorServiceRef, proc pollForEvents(service: MonitorServiceRef, node: BeaconNodeServerRef,
node: BeaconNodeServerRef) {.async.} = response: RestHttpResponseRef): Future[bool] {.async.} =
let
vc = service.client
roles = {BeaconNodeRole.BlockProposalData}
statuses = {RestBeaconNodeStatus.Synced}
logScope: logScope:
node = node node = node
debug "Block monitoring loop started"
while true:
while node.status notin statuses:
await vc.waitNodes(nil, statuses, roles, false)
let respOpt =
try:
let res = await node.client.subscribeEventStream({EventTopic.Block})
if res.status == 200:
Opt.some(res)
else:
node.status = RestBeaconNodeStatus.Incompatible
let let
body = await res.getBodyBytes() vc = service.client
plain = RestPlainResponse( events =
status: res.status,
contentType: res.contentType,
data: body)
reason = plain.getErrorMessage()
info "Unable to to obtain events stream", code = res.status,
reason = reason
Opt.none(RestHttpResponseRef)
except RestError as exc:
node.status = RestBeaconNodeStatus.Offline
info "Unable to obtain events stream", reason = $exc.msg
Opt.none(RestHttpResponseRef)
except CancelledError as exc:
debug "Block monitoring loop has been interrupted"
raise exc
except CatchableError as exc:
node.status = RestBeaconNodeStatus.Offline
info "Got an unexpected error while trying to establish event stream",
reason = $exc.msg
Opt.none(RestHttpResponseRef)
if respOpt.isNone():
continue
let response = respOpt.get()
while true:
let eventsOpt =
try: try:
let res = await response.getServerSentEvents() await response.getServerSentEvents()
Opt.some(res)
except RestError as exc: except RestError as exc:
node.status = RestBeaconNodeStatus.Offline node.status = RestBeaconNodeStatus.Offline
info "Unable to receive server-sent event", reason = $exc.msg debug "Unable to receive server-sent event", reason = $exc.msg
Opt.none(seq[ServerSentEvent]) return false
except CancelledError as exc: except CancelledError as exc:
debug "Block monitoring loop has been interrupted" debug "Block monitoring loop has been interrupted"
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
node.status = RestBeaconNodeStatus.Offline node.status = RestBeaconNodeStatus.Offline
info "Got an unexpected error, " & warn "Got an unexpected error, " &
"while reading server-sent event stream", reason = $exc.msg "while reading server-sent event stream", reason = $exc.msg
Opt.none(seq[ServerSentEvent]) return false
if eventsOpt.isNone():
break
let events = eventsOpt.get()
if len(events) == 0:
break
for event in events: for event in events:
case event.name case event.name
@ -95,7 +45,7 @@ proc blocksLoop(service: MonitorServiceRef,
let blck = EventBeaconBlockObject.decodeString(event.data).valueOr: let blck = EventBeaconBlockObject.decodeString(event.data).valueOr:
node.status = RestBeaconNodeStatus.Incompatible node.status = RestBeaconNodeStatus.Incompatible
debug "Got invalid block event format", reason = error debug "Got invalid block event format", reason = error
continue return
vc.registerBlock(blck) vc.registerBlock(blck)
of "event": of "event":
if event.data != "block": if event.data != "block":
@ -106,9 +56,86 @@ proc blocksLoop(service: MonitorServiceRef,
node.status = RestBeaconNodeStatus.Incompatible node.status = RestBeaconNodeStatus.Incompatible
debug "Got some unexpected event field", event_name = event.name debug "Got some unexpected event field", event_name = event.name
return true
proc pollForBlockEvents(service: MonitorServiceRef,
node: BeaconNodeServerRef) {.async.} =
let
vc = service.client
roles = {BeaconNodeRole.BlockProposalData}
statuses = {RestBeaconNodeStatus.Synced}
logScope:
node = node
while node.status notin statuses:
await vc.waitNodes(nil, statuses, roles, false)
let response =
try:
let res = await node.client.subscribeEventStream({EventTopic.Block})
if res.status == 200:
res
else:
node.status = RestBeaconNodeStatus.Incompatible
let
body = await res.getBodyBytes()
plain = RestPlainResponse(status: res.status,
contentType: res.contentType, data: body)
reason = plain.getErrorMessage()
info "Unable to to obtain events stream", code = res.status,
reason = reason
return
except RestError as exc:
node.status = RestBeaconNodeStatus.Offline
debug "Unable to obtain events stream", reason = $exc.msg
return
except CancelledError as exc:
debug "Block monitoring loop has been interrupted"
raise exc
except CatchableError as exc:
node.status = RestBeaconNodeStatus.Offline
warn "Got an unexpected error while trying to establish event stream",
reason = $exc.msg
return
var breakLoop = false
while not(breakLoop):
breakLoop =
try:
let res = await service.pollForEvents(node, response)
not(res)
except CancelledError as exc:
await response.closeWait()
raise exc
except CatchableError as exc:
warn "Got an unexpected error while receiving block events",
reason = $exc.msg
true
await response.closeWait() await response.closeWait()
debug "Block monitoring loop exited" proc blocksLoop(service: MonitorServiceRef,
node: BeaconNodeServerRef) {.async.} =
logScope:
node = node
debug "Block monitoring loop started"
var breakLoop = false
while not(breakLoop):
breakLoop =
try:
await service.pollForBlockEvents(node)
false
except CancelledError:
true
except CatchableError as exc:
warn "Got an unexpected error while polling for block events",
reason = $exc.msg
true
debug "Block monitoring loop stopped"
proc blockMonitoringLoop(service: MonitorServiceRef) {.async.} = proc blockMonitoringLoop(service: MonitorServiceRef) {.async.} =
let let
@ -118,14 +145,22 @@ proc blockMonitoringLoop(service: MonitorServiceRef) {.async.} =
debug "Starting main block monitoring loop", nodes_count = len(blockNodes) debug "Starting main block monitoring loop", nodes_count = len(blockNodes)
var loops: seq[Future[void]] var
while true: loops: seq[Future[void]]
for node in blockNodes: breakLoop = false
loops.add(service.blocksLoop(node))
try: try:
discard await race(loops.mapIt(FutureBase(it))) for node in blockNodes:
loops.add(service.blocksLoop(node))
except CatchableError as exc:
warn "An unexpected error occurred while starting block monitoring loop",
reason = $exc.msg, error_name = $exc.name
return
while not(breakLoop):
breakLoop =
try:
discard await race(loops.mapIt(FutureBase(it)))
for index, future in loops.pairs(): for index, future in loops.pairs():
if future.finished(): if future.finished():
let reason = let reason =
@ -139,13 +174,135 @@ proc blockMonitoringLoop(service: MonitorServiceRef) {.async.} =
reason = reason, node = blockNodes[index] reason = reason, node = blockNodes[index]
loops[index] = service.blocksLoop(blockNodes[index]) loops[index] = service.blocksLoop(blockNodes[index])
break break
false
except CancelledError as exc: except CancelledError:
debug "Main block monitoring loop was interrupted"
var pending: seq[Future[void]] var pending: seq[Future[void]]
for future in loops: for future in loops:
if not(future.finished()): pending.add(future.cancelAndWait()) if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending) await allFutures(pending)
true
except CatchableError as exc:
warn "An unexpected error occurred while running main block " &
" monitoring loop", reason = $exc.msg, error_name = $exc.name
true
proc pollForTime(service: MonitorServiceRef,
node: BeaconNodeServerRef) {.async.} =
let
vc = service.client
roles = AllBeaconNodeRoles
statuses = AllBeaconNodeStatuses - {RestBeaconNodeStatus.Offline}
logScope:
node = node
while node.status notin statuses:
await vc.waitNodes(nil, statuses, roles, false)
let tres =
try:
let res = await node.client.getTimeOffset()
Opt.some(res)
except RestError as exc:
debug "Unable to obtain remote beacon node time offset", reason = $exc.msg
node.status = RestBeaconNodeStatus.Offline
Opt.none(int64)
except RestResponseError as exc:
debug "Remote beacon node responds with invalid status",
status = $exc.status, reason = $exc.msg, message = $exc.message
node.status = RestBeaconNodeStatus.Incompatible
Opt.none(int64)
except CancelledError as exc:
raise exc raise exc
except CatchableError as exc:
warn "An unexpected error occurred while asking remote beacon node for " &
"time offset", reason = $exc.msg, error = $exc.name
node.status = RestBeaconNodeStatus.Offline
Opt.none(int64)
if tres.isSome():
let
timeOffset = tres.get()
timeDuration = nanoseconds(abs(tres.get()))
soffset = if tres.get() < 0: "-" & $timeDuration else: $timeDuration
if timeDuration >= WARNING_TIME_OFFSET:
warn "Remote beacon node has significant time offset", offset = soffset
elif timeDuration >= NOTICE_TIME_OFFSET:
notice "Remote beacon node has big time offset", offset = soffset
elif timeDuration >= DEBUG_TIME_OFFSET:
debug "Remote beacon node has some time offset", offset = soffset
await service.waitForNextEpoch(ZeroDuration)
proc timeLoop(service: MonitorServiceRef, node: BeaconNodeServerRef) {.async.} =
logScope:
node = node
debug "Time monitoring loop started"
var breakLoop = false
while not(breakLoop):
breakLoop =
try:
await pollForTime(service, node)
false
except CancelledError:
true
except CatchableError:
true
debug "Time monitoring loop stopped"
proc timeMonitoringLoop(service: MonitorServiceRef) {.async.} =
let
vc = service.client
blockNodes = vc.filterNodes(AllBeaconNodeStatuses, AllBeaconNodeRoles)
debug "Starting main time monitoring loop", nodes_count = len(blockNodes)
var
loops: seq[Future[void]]
breakLoop = false
try:
for node in blockNodes:
loops.add(service.timeLoop(node))
except CatchableError as exc:
warn "An unexpected error occurred while starting time monitoring loop",
reason = $exc.msg, error_name = $exc.name
return
while not(breakLoop):
breakLoop =
try:
discard await race(loops.mapIt(FutureBase(it)))
for index, future in loops.pairs():
if future.finished():
let reason =
if future.done():
"without error"
elif future.failed():
$future.readError().msg
else:
"interrupted"
debug "Block monitoring loop unexpectedly finished, restarting",
reason = reason, node = blockNodes[index]
loops[index] = service.timeLoop(blockNodes[index])
break
false
except CancelledError:
debug "Main time monitoring loop was interrupted"
var pending: seq[Future[void]]
for future in loops:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
true
except CatchableError as exc:
warn "An unexpected error occurred while running main time " &
" monitoring loop",
reason = $exc.msg, error_name = $exc.name
true
proc mainLoop(service: MonitorServiceRef) {.async.} = proc mainLoop(service: MonitorServiceRef) {.async.} =
let vc = service.client let vc = service.client