From 317188e3e7b7b3c7c0846560a2bd7842e2782c62 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 13 Apr 2023 13:46:21 +0300 Subject: [PATCH] Time offset monitoring. --- beacon_chain/rpc/rest_constants.nim | 2 + beacon_chain/rpc/rest_nimbus_api.nim | 24 ++ beacon_chain/rpc/rest_utils.nim | 4 +- .../eth2_apis/eth2_rest_serialization.nim | 7 +- beacon_chain/spec/eth2_apis/rest_common.nim | 6 + .../spec/eth2_apis/rest_nimbus_calls.nim | 38 +- beacon_chain/spec/eth2_apis/rest_types.nim | 8 + beacon_chain/validator_client/common.nim | 8 + .../validator_client/doppelganger_service.nim | 10 +- .../validator_client/fork_service.nim | 8 +- .../validator_client/monitor_service.nim | 355 +++++++++++++----- 11 files changed, 351 insertions(+), 119 deletions(-) diff --git a/beacon_chain/rpc/rest_constants.nim b/beacon_chain/rpc/rest_constants.nim index ca3e8951d..700b9ebf4 100644 --- a/beacon_chain/rpc/rest_constants.nim +++ b/beacon_chain/rpc/rest_constants.nim @@ -234,3 +234,5 @@ const "The given merkle proof is invalid" InvalidMerkleProofIndexError* = "The given merkle proof index is invalid" + InvalidTimestampValue* = + "Invalid or missing timestamp value" diff --git a/beacon_chain/rpc/rest_nimbus_api.nim b/beacon_chain/rpc/rest_nimbus_api.nim index ddeb72b8c..d76f46a79 100644 --- a/beacon_chain/rpc/rest_nimbus_api.nim +++ b/beacon_chain/rpc/rest_nimbus_api.nim @@ -427,3 +427,27 @@ proc installNimbusApiHandlers*(router: var RestRouter, node: BeaconNode) = all_peers: allPeers ) ) + + router.api(MethodPost, "/nimbus/v1/timesync") do ( + contentBody: Option[ContentBody]) -> RestApiResponse: + let + timestamp2 = getTimestamp() + timestamp1 = + block: + if contentBody.isNone(): + return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) + let dres = decodeBody(RestNimbusTimestamp1, contentBody.get()) + if dres.isErr(): + return RestApiResponse.jsonError(Http400, + InvalidTimestampValue, + $dres.error()) + dres.get().timestamp1 + # We need some gap here, otherwise `timestamp2` would be equal to + # `timestamp3`. + await sleepAsync(50.milliseconds) + let response = RestNimbusTimestamp2( + timestamp1: timestamp1, + timestamp2: timestamp2, + timestamp3: getTimestamp() + ) + return RestApiResponse.jsonResponsePlain(response) diff --git a/beacon_chain/rpc/rest_utils.nim b/beacon_chain/rpc/rest_utils.nim index 6a78071be..aca599c26 100644 --- a/beacon_chain/rpc/rest_utils.nim +++ b/beacon_chain/rpc/rest_utils.nim @@ -10,7 +10,7 @@ import std/[options, macros], stew/byteutils, presto, ../spec/[forks], - ../spec/eth2_apis/[rest_types, eth2_rest_serialization], + ../spec/eth2_apis/[rest_types, eth2_rest_serialization, rest_common], ../validators/validator_duties, ../consensus_object_pools/blockchain_dag, ../beacon_node, @@ -18,7 +18,7 @@ import std/[options, macros], export options, eth2_rest_serialization, blockchain_dag, presto, rest_types, - rest_constants + rest_constants, rest_common type ValidatorIndexError* {.pure.} = enum diff --git a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim index bc4fa3303..b0ebf3b90 100644 --- a/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim +++ b/beacon_chain/spec/eth2_apis/eth2_rest_serialization.nim @@ -104,7 +104,8 @@ type capella_mev.SignedBlindedBeaconBlock | SignedValidatorRegistrationV1 | SignedVoluntaryExit | - Web3SignerRequest + Web3SignerRequest | + RestNimbusTimestamp1 EncodeOctetTypes* = altair.SignedBeaconBlock | @@ -152,7 +153,9 @@ type GetStateRootResponse | GetBlockRootResponse | SomeForkedLightClientObject | - seq[SomeForkedLightClientObject] + seq[SomeForkedLightClientObject] | + RestNimbusTimestamp1 | + RestNimbusTimestamp2 RestVersioned*[T] = object data*: T diff --git a/beacon_chain/spec/eth2_apis/rest_common.nim b/beacon_chain/spec/eth2_apis/rest_common.nim index cb9e30b8a..4a81517a8 100644 --- a/beacon_chain/spec/eth2_apis/rest_common.nim +++ b/beacon_chain/spec/eth2_apis/rest_common.nim @@ -10,6 +10,9 @@ import chronos, presto/client, "."/[rest_types, eth2_rest_serialization] +from std/times import Time, DateTime, toTime, fromUnix, now, utc, `-`, + inNanoseconds + export chronos, client, rest_types, eth2_rest_serialization proc raiseGenericError*(resp: RestPlainResponse) {. @@ -52,3 +55,6 @@ proc getBodyBytesWithCap*( if not(isNil(reader)): await reader.closeWait() raise newHttpReadError("Could not read response") + +proc getTimestamp*(): uint64 = + uint64((toTime(now().utc) - fromUnix(0)).inNanoseconds()) diff --git a/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim b/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim index 8f9c52b25..3ea82313d 100644 --- a/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim @@ -8,10 +8,46 @@ import chronos, presto/client, - "."/[rest_types, eth2_rest_serialization] + "."/[rest_types, eth2_rest_serialization, rest_common] proc getValidatorsActivity*(epoch: Epoch, body: seq[ValidatorIndex] ): RestPlainResponse {. rest, endpoint: "/nimbus/v1/validator/activity/{epoch}", meth: MethodPost.} + +proc getTimesyncInifo*(body: RestNimbusTimestamp1): RestPlainResponse {. + rest, endpoint: "/nimbus/v1/timesync", meth: MethodPost.} + +proc getTimeOffset*(client: RestClientRef): Future[int64] {.async.} = + let + timestamp1 = getTimestamp() + data = RestNimbusTimestamp1(timestamp1: timestamp1) + resp = await client.getTimesyncInifo(data) + + return + case resp.status + of 200: + if resp.contentType.isNone() or + isWildCard(resp.contentType.get().mediaType) or + resp.contentType.get().mediaType != ApplicationJsonMediaType: + raise newException(RestError, "Missing or incorrect Content-Type") + + let stamps = decodeBytes(RestNimbusTimestamp2, resp.data, + resp.contentType).valueOr: + raise newException(RestError, $error) + + let offset = (int64(stamps.timestamp2) - int64(timestamp1)) + + (int64(stamps.timestamp3) - int64(getTimestamp())) + offset + of 400: + let error = decodeBytes(RestErrorMessage, resp.data, + resp.contentType).valueOr: + let msg = "Incorrect response error format (" & $resp.status & + ") [" & $error & "]" + raise (ref RestResponseError)(msg: msg, status: resp.status) + let msg = "Error response (" & $resp.status & ") [" & error.message & "]" + raise (ref RestResponseError)( + msg: msg, status: error.code, message: error.message) + else: + raiseRestResponseError(resp) diff --git a/beacon_chain/spec/eth2_apis/rest_types.nim b/beacon_chain/spec/eth2_apis/rest_types.nim index 695f78da8..284006ee8 100644 --- a/beacon_chain/spec/eth2_apis/rest_types.nim +++ b/beacon_chain/spec/eth2_apis/rest_types.nim @@ -625,6 +625,14 @@ type RestRoot* = object root*: Eth2Digest + RestNimbusTimestamp1* = object + timestamp1*: uint64 + + RestNimbusTimestamp2* = object + timestamp1*: uint64 + timestamp2*: uint64 + timestamp3*: uint64 + # Types based on the OAPI yaml file - used in responses to requests GetBeaconHeadResponse* = DataEnclosedObject[Slot] GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation] diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index edcb3eff8..60e2f68d5 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -1153,3 +1153,11 @@ proc waitForBlock*( let dur = Moment.now() - startTime debug "Waiting for block cutoff was interrupted", duration = dur raise exc + +proc waitForNextEpoch*(service: ClientServiceRef, + delay: Duration) {.async.} = + let vc = service.client + let sleepTime = vc.beaconClock.durationToNextEpoch() + delay + debug "Sleeping until next epoch", service = service.name, + sleep_time = sleepTime, delay = delay + await sleepAsync(sleepTime) diff --git a/beacon_chain/validator_client/doppelganger_service.nim b/beacon_chain/validator_client/doppelganger_service.nim index 9cef4488b..591985240 100644 --- a/beacon_chain/validator_client/doppelganger_service.nim +++ b/beacon_chain/validator_client/doppelganger_service.nim @@ -21,12 +21,6 @@ proc getCheckingList*(vc: ValidatorClientRef, epoch: Epoch): seq[ValidatorIndex] res.add validator.index.get() res -proc waitForNextEpoch(service: DoppelgangerServiceRef) {.async.} = - let vc = service.client - let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT - debug "Sleeping until next epoch", sleep_time = sleepTime - await sleepAsync(sleepTime) - proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch, activities: GetValidatorsLivenessResponse) = let vc = service.client @@ -79,12 +73,12 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} = # before that, we can safely perform the check for epoch 0 and thus keep # validating in epoch 1 if vc.beaconClock.now().slotOrZero() > GENESIS_SLOT: - await service.waitForNextEpoch() + await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT) while try: # Wait for the epoch to end - at the end (or really, the beginning of the # next one, we ask what happened - await service.waitForNextEpoch() + await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT) let currentEpoch = vc.currentSlot().epoch() previousEpoch = diff --git a/beacon_chain/validator_client/fork_service.nim b/beacon_chain/validator_client/fork_service.nim index dbd1cb53e..74960b46c 100644 --- a/beacon_chain/validator_client/fork_service.nim +++ b/beacon_chain/validator_client/fork_service.nim @@ -71,12 +71,6 @@ proc pollForFork(vc: ValidatorClientRef) {.async.} = notice "Fork schedule updated", fork_schedule = sortedForks vc.forksAvailable.fire() -proc waitForNextEpoch(service: ForkServiceRef) {.async.} = - let vc = service.client - let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT - debug "Sleeping until next epoch", sleep_time = sleepTime - await sleepAsync(sleepTime) - proc mainLoop(service: ForkServiceRef) {.async.} = let vc = service.client service.state = ServiceState.Running @@ -99,7 +93,7 @@ proc mainLoop(service: ForkServiceRef) {.async.} = let breakLoop = try: await vc.pollForFork() - await service.waitForNextEpoch() + await service.waitForNextEpoch(TIME_DELAY_FROM_SLOT) false except CancelledError: debug "Service interrupted" diff --git a/beacon_chain/validator_client/monitor_service.nim b/beacon_chain/validator_client/monitor_service.nim index fe59871bc..de3bb7e45 100644 --- a/beacon_chain/validator_client/monitor_service.nim +++ b/beacon_chain/validator_client/monitor_service.nim @@ -10,11 +10,56 @@ from fallback_service import waitNodes, filterNodes const ServiceName = "monitor_service" + WARNING_TIME_OFFSET = 2.seconds + NOTICE_TIME_OFFSET = 1.seconds + DEBUG_TIME_OFFSET = 500.milliseconds logScope: service = ServiceName -proc blocksLoop(service: MonitorServiceRef, - node: BeaconNodeServerRef) {.async.} = +proc pollForEvents(service: MonitorServiceRef, node: BeaconNodeServerRef, + response: RestHttpResponseRef): Future[bool] {.async.} = + logScope: + node = node + + let + vc = service.client + events = + try: + await response.getServerSentEvents() + except RestError as exc: + node.status = RestBeaconNodeStatus.Offline + debug "Unable to receive server-sent event", reason = $exc.msg + return false + except CancelledError as exc: + debug "Block monitoring loop has been interrupted" + raise exc + except CatchableError as exc: + node.status = RestBeaconNodeStatus.Offline + warn "Got an unexpected error, " & + "while reading server-sent event stream", reason = $exc.msg + return false + + for event in events: + case event.name + of "data": + let blck = EventBeaconBlockObject.decodeString(event.data).valueOr: + node.status = RestBeaconNodeStatus.Incompatible + debug "Got invalid block event format", reason = error + return + vc.registerBlock(blck) + of "event": + if event.data != "block": + node.status = RestBeaconNodeStatus.Incompatible + debug "Got unexpected event name field", event_name = event.name, + event_data = event.data + else: + node.status = RestBeaconNodeStatus.Incompatible + debug "Got some unexpected event field", event_name = event.name + + return true + +proc pollForBlockEvents(service: MonitorServiceRef, + node: BeaconNodeServerRef) {.async.} = let vc = service.client roles = {BeaconNodeRole.BlockProposalData} @@ -23,92 +68,74 @@ proc blocksLoop(service: MonitorServiceRef, logScope: node = node - debug "Block monitoring loop started" + while node.status notin statuses: + await vc.waitNodes(nil, statuses, roles, false) - while true: - while node.status notin statuses: - await vc.waitNodes(nil, statuses, roles, false) + let response = + try: + let res = await node.client.subscribeEventStream({EventTopic.Block}) + if res.status == 200: + res + else: + node.status = RestBeaconNodeStatus.Incompatible + let + body = await res.getBodyBytes() + plain = RestPlainResponse(status: res.status, + contentType: res.contentType, data: body) + reason = plain.getErrorMessage() + info "Unable to to obtain events stream", code = res.status, + reason = reason + return + except RestError as exc: + node.status = RestBeaconNodeStatus.Offline + debug "Unable to obtain events stream", reason = $exc.msg + return + except CancelledError as exc: + debug "Block monitoring loop has been interrupted" + raise exc + except CatchableError as exc: + node.status = RestBeaconNodeStatus.Offline + warn "Got an unexpected error while trying to establish event stream", + reason = $exc.msg + return - let respOpt = + var breakLoop = false + while not(breakLoop): + breakLoop = try: - let res = await node.client.subscribeEventStream({EventTopic.Block}) - if res.status == 200: - Opt.some(res) - else: - node.status = RestBeaconNodeStatus.Incompatible - let - body = await res.getBodyBytes() - plain = RestPlainResponse( - status: res.status, - contentType: res.contentType, - data: body) - reason = plain.getErrorMessage() - info "Unable to to obtain events stream", code = res.status, - reason = reason - Opt.none(RestHttpResponseRef) - except RestError as exc: - node.status = RestBeaconNodeStatus.Offline - info "Unable to obtain events stream", reason = $exc.msg - Opt.none(RestHttpResponseRef) + let res = await service.pollForEvents(node, response) + not(res) except CancelledError as exc: - debug "Block monitoring loop has been interrupted" + await response.closeWait() raise exc except CatchableError as exc: - node.status = RestBeaconNodeStatus.Offline - info "Got an unexpected error while trying to establish event stream", - reason = $exc.msg - Opt.none(RestHttpResponseRef) + warn "Got an unexpected error while receiving block events", + reason = $exc.msg + true - if respOpt.isNone(): - continue + await response.closeWait() - let response = respOpt.get() +proc blocksLoop(service: MonitorServiceRef, + node: BeaconNodeServerRef) {.async.} = + logScope: + node = node - while true: - let eventsOpt = - try: - let res = await response.getServerSentEvents() - Opt.some(res) - except RestError as exc: - node.status = RestBeaconNodeStatus.Offline - info "Unable to receive server-sent event", reason = $exc.msg - Opt.none(seq[ServerSentEvent]) - except CancelledError as exc: - debug "Block monitoring loop has been interrupted" - raise exc - except CatchableError as exc: - node.status = RestBeaconNodeStatus.Offline - info "Got an unexpected error, " & - "while reading server-sent event stream", reason = $exc.msg - Opt.none(seq[ServerSentEvent]) + debug "Block monitoring loop started" - if eventsOpt.isNone(): - break + var breakLoop = false + while not(breakLoop): + breakLoop = + try: + await service.pollForBlockEvents(node) + false + except CancelledError: + true + except CatchableError as exc: + warn "Got an unexpected error while polling for block events", + reason = $exc.msg + true - let events = eventsOpt.get() - if len(events) == 0: - break - - for event in events: - case event.name - of "data": - let blck = EventBeaconBlockObject.decodeString(event.data).valueOr: - node.status = RestBeaconNodeStatus.Incompatible - debug "Got invalid block event format", reason = error - continue - vc.registerBlock(blck) - of "event": - if event.data != "block": - node.status = RestBeaconNodeStatus.Incompatible - debug "Got unexpected event name field", event_name = event.name, - event_data = event.data - else: - node.status = RestBeaconNodeStatus.Incompatible - debug "Got some unexpected event field", event_name = event.name - - await response.closeWait() - - debug "Block monitoring loop exited" + debug "Block monitoring loop stopped" proc blockMonitoringLoop(service: MonitorServiceRef) {.async.} = let @@ -118,34 +145,164 @@ proc blockMonitoringLoop(service: MonitorServiceRef) {.async.} = debug "Starting main block monitoring loop", nodes_count = len(blockNodes) - var loops: seq[Future[void]] - while true: + var + loops: seq[Future[void]] + breakLoop = false + + try: for node in blockNodes: loops.add(service.blocksLoop(node)) + except CatchableError as exc: + warn "An unexpected error occurred while starting block monitoring loop", + reason = $exc.msg, error_name = $exc.name + return + while not(breakLoop): + breakLoop = + try: + discard await race(loops.mapIt(FutureBase(it))) + for index, future in loops.pairs(): + if future.finished(): + let reason = + if future.done(): + "without error" + elif future.failed(): + $future.readError().msg + else: + "interrupted" + debug "Block monitoring loop unexpectedly finished, restarting", + reason = reason, node = blockNodes[index] + loops[index] = service.blocksLoop(blockNodes[index]) + break + false + except CancelledError: + debug "Main block monitoring loop was interrupted" + var pending: seq[Future[void]] + for future in loops: + if not(future.finished()): pending.add(future.cancelAndWait()) + await allFutures(pending) + true + except CatchableError as exc: + warn "An unexpected error occurred while running main block " & + " monitoring loop", reason = $exc.msg, error_name = $exc.name + true + +proc pollForTime(service: MonitorServiceRef, + node: BeaconNodeServerRef) {.async.} = + let + vc = service.client + roles = AllBeaconNodeRoles + statuses = AllBeaconNodeStatuses - {RestBeaconNodeStatus.Offline} + + logScope: + node = node + + while node.status notin statuses: + await vc.waitNodes(nil, statuses, roles, false) + + let tres = try: - discard await race(loops.mapIt(FutureBase(it))) - - for index, future in loops.pairs(): - if future.finished(): - let reason = - if future.done(): - "without error" - elif future.failed(): - $future.readError().msg - else: - "interrupted" - debug "Block monitoring loop unexpectedly finished, restarting", - reason = reason, node = blockNodes[index] - loops[index] = service.blocksLoop(blockNodes[index]) - break - + let res = await node.client.getTimeOffset() + Opt.some(res) + except RestError as exc: + debug "Unable to obtain remote beacon node time offset", reason = $exc.msg + node.status = RestBeaconNodeStatus.Offline + Opt.none(int64) + except RestResponseError as exc: + debug "Remote beacon node responds with invalid status", + status = $exc.status, reason = $exc.msg, message = $exc.message + node.status = RestBeaconNodeStatus.Incompatible + Opt.none(int64) except CancelledError as exc: - var pending: seq[Future[void]] - for future in loops: - if not(future.finished()): pending.add(future.cancelAndWait()) - await allFutures(pending) raise exc + except CatchableError as exc: + warn "An unexpected error occurred while asking remote beacon node for " & + "time offset", reason = $exc.msg, error = $exc.name + node.status = RestBeaconNodeStatus.Offline + Opt.none(int64) + + if tres.isSome(): + let + timeOffset = tres.get() + timeDuration = nanoseconds(abs(tres.get())) + soffset = if tres.get() < 0: "-" & $timeDuration else: $timeDuration + if timeDuration >= WARNING_TIME_OFFSET: + warn "Remote beacon node has significant time offset", offset = soffset + elif timeDuration >= NOTICE_TIME_OFFSET: + notice "Remote beacon node has big time offset", offset = soffset + elif timeDuration >= DEBUG_TIME_OFFSET: + debug "Remote beacon node has some time offset", offset = soffset + + await service.waitForNextEpoch(ZeroDuration) + +proc timeLoop(service: MonitorServiceRef, node: BeaconNodeServerRef) {.async.} = + logScope: + node = node + + debug "Time monitoring loop started" + + var breakLoop = false + while not(breakLoop): + breakLoop = + try: + await pollForTime(service, node) + false + except CancelledError: + true + except CatchableError: + true + + debug "Time monitoring loop stopped" + +proc timeMonitoringLoop(service: MonitorServiceRef) {.async.} = + let + vc = service.client + blockNodes = vc.filterNodes(AllBeaconNodeStatuses, AllBeaconNodeRoles) + + debug "Starting main time monitoring loop", nodes_count = len(blockNodes) + + var + loops: seq[Future[void]] + breakLoop = false + + try: + for node in blockNodes: + loops.add(service.timeLoop(node)) + except CatchableError as exc: + warn "An unexpected error occurred while starting time monitoring loop", + reason = $exc.msg, error_name = $exc.name + return + + while not(breakLoop): + breakLoop = + try: + discard await race(loops.mapIt(FutureBase(it))) + for index, future in loops.pairs(): + if future.finished(): + let reason = + if future.done(): + "without error" + elif future.failed(): + $future.readError().msg + else: + "interrupted" + debug "Block monitoring loop unexpectedly finished, restarting", + reason = reason, node = blockNodes[index] + loops[index] = service.timeLoop(blockNodes[index]) + break + false + except CancelledError: + debug "Main time monitoring loop was interrupted" + var pending: seq[Future[void]] + for future in loops: + if not(future.finished()): pending.add(future.cancelAndWait()) + await allFutures(pending) + true + except CatchableError as exc: + warn "An unexpected error occurred while running main time " & + " monitoring loop", + reason = $exc.msg, error_name = $exc.name + true proc mainLoop(service: MonitorServiceRef) {.async.} = let vc = service.client