Eugene Kabanov b51152153a
VC: Hardening and optimizing time handling. (#4743)
* Fix durationToNextSlot() and durationToNextEpoch() to work not only after Genesis, but also before Genesis.
Change VC pre-genesis behavior, add runPreGenesisWaitingLoop() and runGenesisWaitingLoop().
Add checkedWaitForSlot() and checkedWaitForNextSlot() to strictly check current time and print warnings.
Fix VC main loop to use checkedWaitForNextSlot().
Fix attestation_service to run attestations processing only until the end of the duty slot.
Change attestation_service main loop to use checkedWaitForNextSlot().
Change block_service to properly cancel all the pending proposer tasks.
Use checkedWaitForSlot to wait for block proposal.
Fix block_service waitForBlockPublished() to be compatible with BN.
Fix sync_committee_service to avoid asyncSpawn.
Fix sync_committee_service to run only until the end of the duty slot.
Fix sync_committee_service to use checkedWaitForNextSlot().

* Refactor validator logging.
Fix aggregated attestation publishing missing delay.

* Fix doppelganger detection should not start at pre-genesis time.
Fix fallback service sync status spam.
Fix false `sync committee subnets subscription error`.

* Address review comments part 1.

* Address review comments.

* Fix condition issue for near genesis waiting loop.

* Address review comments.

* Address review comments 2.
2023-04-17 21:31:54 +00:00

125 lines
3.9 KiB
Nim

# beacon_chain
# Copyright (c) 2021-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import std/algorithm
import chronicles
import common, api
const
ServiceName = "fork_service"
logScope: service = ServiceName
proc validateForkSchedule(forks: openArray[Fork]): bool {.raises: [Defect].} =
# Check if `forks` list is linked list.
var current_version = forks[0].current_version
for index, item in forks:
if index > 0:
if item.previous_version != current_version:
return false
else:
if item.previous_version != item.current_version:
return false
current_version = item.current_version
true
proc sortForks(forks: openArray[Fork]): Result[seq[Fork], cstring] {.
raises: [Defect].} =
proc cmp(x, y: Fork): int {.closure.} =
if uint64(x.epoch) == uint64(y.epoch): return 0
if uint64(x.epoch) < uint64(y.epoch): return -1
return 1
if len(forks) == 0:
return err("Empty fork schedule")
let sortedForks = sorted(forks, cmp)
if not(validateForkSchedule(sortedForks)):
return err("Invalid fork schedule")
ok(sortedForks)
proc pollForFork(vc: ValidatorClientRef) {.async.} =
let forks =
try:
await vc.getForkSchedule(ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to retrieve fork schedule",
reason = exc.getFailureReason(), err_msg = exc.msg
return
except CancelledError as exc:
debug "Fork retrieval process was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while getting fork information",
err_name = exc.name, err_msg = exc.msg
return
let sortedForks =
block:
let res = sortForks(forks)
if res.isErr():
warn "Invalid fork schedule received", reason = res.error()
return
res.get()
if (len(vc.forks) == 0) or (vc.forks != sortedForks):
vc.forks = sortedForks
notice "Fork schedule updated", fork_schedule = sortedForks
vc.forksAvailable.fire()
proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
debug "Sleeping until next epoch", sleep_time = sleepTime
await sleepAsync(sleepTime)
proc mainLoop(service: ForkServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
try:
await vc.preGenesisEvent.wait()
except CancelledError:
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
return
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
await vc.pollForFork()
await service.waitForNextEpoch()
false
except CancelledError:
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
true
if breakLoop:
break
proc init*(t: typedesc[ForkServiceRef],
vc: ValidatorClientRef): Future[ForkServiceRef] {.async.} =
logScope: service = ServiceName
let res = ForkServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized)
debug "Initializing service"
return res
proc start*(service: ForkServiceRef) =
service.lifeFut = mainLoop(service)