VC: Hardening and optimizing time handling. (#4743)

* Fix durationToNextSlot() and durationToNextEpoch() to work not only after Genesis, but also before Genesis.
Change VC pre-genesis behavior, add runPreGenesisWaitingLoop() and runGenesisWaitingLoop().
Add checkedWaitForSlot() and checkedWaitForNextSlot() to strictly check current time and print warnings.
Fix VC main loop to use checkedWaitForNextSlot().
Fix attestation_service to run attestations processing only until the end of the duty slot.
Change attestation_service main loop to use checkedWaitForNextSlot().
Change block_service to properly cancel all the pending proposer tasks.
Use checkedWaitForSlot to wait for block proposal.
Fix block_service waitForBlockPublished() to be compatible with BN.
Fix sync_committee_service to avoid asyncSpawn.
Fix sync_committee_service to run only until the end of the duty slot.
Fix sync_committee_service to use checkedWaitForNextSlot().

* Refactor validator logging.
Fix aggregated attestation publishing missing delay.

* Fix doppelganger detection should not start at pre-genesis time.
Fix fallback service sync status spam.
Fix false `sync committee subnets subscription error`.

* Address review comments part 1.

* Address review comments.

* Fix condition issue for near genesis waiting loop.

* Address review comments.

* Address review comments 2.
This commit is contained in:
Eugene Kabanov 2023-04-18 00:31:54 +03:00 committed by GitHub
parent 228e10f1d9
commit b51152153a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 862 additions and 517 deletions

View File

@ -70,18 +70,39 @@ proc fromNow*(c: BeaconClock, slot: Slot): tuple[inFuture: bool, offset: Duratio
c.fromNow(slot.start_beacon_time())
proc durationToNextSlot*(c: BeaconClock): Duration =
let (afterGenesis, slot) = c.now().toSlot()
if afterGenesis:
c.fromNow(slot + 1'u64).offset
let
currentTime = c.now()
currentSlot = currentTime.toSlot()
if currentSlot.afterGenesis:
let nextSlot = currentSlot.slot + 1
chronos.nanoseconds(
(nextSlot.start_beacon_time() - currentTime).nanoseconds)
else:
c.fromNow(Slot(0)).offset
# absoluteTime = BeaconTime(-currentTime.ns_since_genesis).
let
absoluteTime = Slot(0).start_beacon_time() +
(Slot(0).start_beacon_time() - currentTime)
timeToNextSlot = absoluteTime - currentSlot.slot.start_beacon_time()
chronos.nanoseconds(timeToNextSlot.nanoseconds)
proc durationToNextEpoch*(c: BeaconClock): Duration =
let (afterGenesis, slot) = c.now().toSlot()
if afterGenesis:
c.fromNow((slot.epoch + 1).start_slot()).offset
let
currentTime = c.now()
currentSlot = currentTime.toSlot()
if currentSlot.afterGenesis:
let nextEpochSlot = (currentSlot.slot.epoch() + 1).start_slot()
chronos.nanoseconds(
(nextEpochSlot.start_beacon_time() - currentTime).nanoseconds)
else:
c.fromNow(Epoch(0).start_slot()).offset
# absoluteTime = BeaconTime(-currentTime.ns_since_genesis).
let
absoluteTime = Slot(0).start_beacon_time() +
(Slot(0).start_beacon_time() - currentTime)
timeToNextEpoch = absoluteTime -
currentSlot.slot.epoch().start_slot().start_beacon_time()
chronos.nanoseconds(timeToNextEpoch.nanoseconds)
func saturate*(d: tuple[inFuture: bool, offset: Duration]): Duration =
if d.inFuture: d.offset else: seconds(0)

View File

@ -9,9 +9,12 @@ import
libp2p/crypto/crypto,
./rpc/rest_key_management_api,
./validator_client/[
common, fallback_service, duties_service, fork_service,
common, fallback_service, duties_service, fork_service, block_service,
doppelganger_service, attestation_service, sync_committee_service]
const
PREGENESIS_EPOCHS_COUNT = 1
proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes)
var nodes = vc.beaconNodes
@ -93,16 +96,22 @@ proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} =
proc initClock(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
# This procedure performs initialization of BeaconClock using current genesis
# information. It also performs waiting for genesis.
let res = BeaconClock.init(vc.beaconGenesis.genesis_time)
let currentSlot = res.now().slotOrZero()
let currentEpoch = currentSlot.epoch()
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = currentSlot, current_epoch = currentEpoch
let genesisTime = res.fromNow(start_beacon_time(Slot(0)))
let
res = BeaconClock.init(vc.beaconGenesis.genesis_time)
currentTime = res.now()
currentSlot = currentTime.slotOrZero()
currentEpoch = currentSlot.epoch()
genesisTime = res.fromNow(Slot(0))
if genesisTime.inFuture:
notice "Waiting for genesis", genesisIn = genesisTime.offset
await sleepAsync(genesisTime.offset)
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = "<n/a>", current_epoch = "<n/a>",
time_to_genesis = genesisTime.offset
else:
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = currentSlot, current_epoch = currentEpoch
return res
proc initMetrics(vc: ValidatorClientRef): Future[bool] {.async.} =
@ -139,58 +148,66 @@ proc shutdownSlashingProtection(vc: ValidatorClientRef) =
info "Closing slashing protection", path = vc.config.validatorsDir()
vc.attachedValidators[].slashingProtection.close()
proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
lastSlot: Slot): Future[bool] {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
## lastSlot: the last slot that we successfully processed, so we know where to
## start work from - there might be jumps if processing is delayed
proc runVCSlotLoop(vc: ValidatorClientRef) {.async.} =
var
startTime = vc.beaconClock.now()
curSlot = startTime.slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
timeToNextSlot = nextSlot.start_beacon_time() - startTime
let
# The slot we should be at, according to the clock
beaconTime = wallTime
wallSlot = wallTime.toSlot()
info "Scheduling first slot action",
start_time = shortLog(startTime),
current_slot = shortLog(curSlot),
next_slot = shortLog(nextSlot),
time_to_next_slot = shortLog(timeToNextSlot)
let
# If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
delay = wallTime - expectedSlot.start_beacon_time()
var currentSlot = Opt.some(curSlot)
if checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch):
return true
while true:
currentSlot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff,
true)
if currentSlot.isNone():
## Fatal log line should be printed by checkedWaitForNextSlot().
return
if len(vc.beaconNodes) > 1:
let
counts = vc.getNodeCounts()
# Good nodes are nodes which can be used for ALL the requests.
goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)]
# Viable nodes are nodes which can be used only SOME of the requests.
viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] +
counts.data[int(RestBeaconNodeStatus.NotSynced)] +
counts.data[int(RestBeaconNodeStatus.Compatible)]
# Bad nodes are nodes which can't be used at all.
badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] +
counts.data[int(RestBeaconNodeStatus.Online)] +
counts.data[int(RestBeaconNodeStatus.Incompatible)]
info "Slot start",
slot = shortLog(wallSlot.slot),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
validators = vc.attachedValidators[].count(),
good_nodes = goodNodes, viable_nodes = viableNodes, bad_nodes = badNodes,
delay = shortLog(delay)
else:
info "Slot start",
slot = shortLog(wallSlot.slot),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot),
validators = vc.attachedValidators[].count(),
node_status = $vc.beaconNodes[0].status,
delay = shortLog(delay)
wallTime = vc.beaconClock.now()
wallSlot = currentSlot.get()
delay = wallTime - wallSlot.start_beacon_time()
return false
if checkIfShouldStopAtEpoch(wallSlot, vc.config.stopAtEpoch):
return
if len(vc.beaconNodes) > 1:
let
counts = vc.getNodeCounts()
# Good nodes are nodes which can be used for ALL the requests.
goodNodes = counts.data[int(RestBeaconNodeStatus.Synced)]
# Viable nodes are nodes which can be used only SOME of the requests.
viableNodes = counts.data[int(RestBeaconNodeStatus.OptSynced)] +
counts.data[int(RestBeaconNodeStatus.NotSynced)] +
counts.data[int(RestBeaconNodeStatus.Compatible)]
# Bad nodes are nodes which can't be used at all.
badNodes = counts.data[int(RestBeaconNodeStatus.Offline)] +
counts.data[int(RestBeaconNodeStatus.Online)] +
counts.data[int(RestBeaconNodeStatus.Incompatible)]
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch()),
attestationIn = vc.getDurationToNextAttestation(wallSlot),
blockIn = vc.getDurationToNextBlock(wallSlot),
validators = vc.attachedValidators[].count(),
good_nodes = goodNodes, viable_nodes = viableNodes,
bad_nodes = badNodes, delay = shortLog(delay)
else:
info "Slot start",
slot = shortLog(wallSlot),
epoch = shortLog(wallSlot.epoch()),
attestationIn = vc.getDurationToNextAttestation(wallSlot),
blockIn = vc.getDurationToNextBlock(wallSlot),
validators = vc.attachedValidators[].count(),
node_status = $vc.beaconNodes[0].status,
delay = shortLog(delay)
proc new*(T: type ValidatorClientRef,
config: ValidatorClientConf,
@ -224,6 +241,8 @@ proc new*(T: type ValidatorClientRef,
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
preGenesisEvent: newAsyncEvent(),
genesisEvent: newAsyncEvent(),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
doppelExit: newAsyncEvent(),
@ -239,6 +258,8 @@ proc new*(T: type ValidatorClientRef,
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
preGenesisEvent: newAsyncEvent(),
genesisEvent: newAsyncEvent(),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
indicesAvailable: newAsyncEvent(),
@ -260,8 +281,8 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =
vc.beaconGenesis = await vc.initGenesis()
info "Genesis information", genesis_time = vc.beaconGenesis.genesis_time,
genesis_fork_version = vc.beaconGenesis.genesis_fork_version,
genesis_root = vc.beaconGenesis.genesis_validators_root
genesis_fork_version = vc.beaconGenesis.genesis_fork_version,
genesis_root = vc.beaconGenesis.genesis_validators_root
vc.beaconClock = await vc.initClock()
@ -295,6 +316,7 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =
vc.dutiesService = await DutiesServiceRef.init(vc)
vc.doppelgangerService = await DoppelgangerServiceRef.init(vc)
vc.attestationService = await AttestationServiceRef.init(vc)
vc.blockService = await BlockServiceRef.init(vc)
vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc)
vc.keymanagerServer = keymanagerInitResult.server
if vc.keymanagerServer != nil:
@ -322,12 +344,65 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =
return vc
proc runPreGenesisWaitingLoop(vc: ValidatorClientRef) {.async.} =
var breakLoop = false
while not(breakLoop):
let
genesisTime = vc.beaconClock.fromNow(Slot(0))
currentEpoch = vc.beaconClock.now().toSlot().slot.epoch()
if not(genesisTime.inFuture) or currentEpoch < PREGENESIS_EPOCHS_COUNT:
break
notice "Waiting for genesis",
genesis_time = vc.beaconGenesis.genesis_time,
time_to_genesis = genesisTime.offset
breakLoop =
try:
await sleepAsync(vc.beaconClock.durationToNextSlot())
false
except CancelledError:
debug "Pre-genesis waiting loop was interrupted"
true
except CatchableError as exc:
error "Pre-genesis waiting loop failed with unexpected error",
err_name = $exc.name, err_msg = $exc.msg
true
vc.preGenesisEvent.fire()
proc runGenesisWaitingLoop(vc: ValidatorClientRef) {.async.} =
var breakLoop = false
while not(breakLoop):
let genesisTime = vc.beaconClock.fromNow(Slot(0))
if not(genesisTime.inFuture):
break
notice "Waiting for genesis",
genesis_time = vc.beaconGenesis.genesis_time,
time_to_genesis = genesisTime.offset
breakLoop =
try:
await sleepAsync(vc.beaconClock.durationToNextSlot())
false
except CancelledError:
debug "Genesis waiting loop was interrupted"
true
except CatchableError as exc:
error "Genesis waiting loop failed with unexpected error",
err_name = $exc.name, err_msg = $exc.msg
true
vc.genesisEvent.fire()
proc asyncRun*(vc: ValidatorClientRef) {.async.} =
vc.fallbackService.start()
vc.forkService.start()
vc.dutiesService.start()
vc.doppelgangerService.start()
vc.attestationService.start()
vc.blockService.start()
vc.syncCommitteeService.start()
if not isNil(vc.keymanagerServer):
@ -337,7 +412,12 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} =
let doppelEventFut = vc.doppelExit.wait()
try:
vc.runSlotLoopFut = runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)
# Waiting for `GENESIS - PREGENESIS_EPOCHS_COUNT` loop.
await vc.runPreGenesisWaitingLoop()
# Waiting for `GENESIS` loop.
await vc.runGenesisWaitingLoop()
# Main processing loop.
vc.runSlotLoopFut = vc.runVCSlotLoop()
vc.runKeystoreCachePruningLoopFut =
runKeystorecachePruningLoop(vc.keystoreCache)
discard await race(vc.runSlotLoopFut, doppelEventFut)
@ -355,8 +435,6 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} =
if doppelEventFut.completed():
# Critically, database has been shut down - the rest doesn't matter, we need
# to stop as soon as possible
# TODO we need to actually quit _before_ any other async tasks have had the
# chance to happen
quitDoppelganger()
debug "Stopping main processing loop"
@ -373,10 +451,10 @@ proc asyncRun*(vc: ValidatorClientRef) {.async.} =
pending.add(vc.dutiesService.stop())
pending.add(vc.doppelgangerService.stop())
pending.add(vc.attestationService.stop())
pending.add(vc.blockService.stop())
pending.add(vc.syncCommitteeService.stop())
if not isNil(vc.keymanagerServer):
pending.add(vc.keymanagerServer.stop())
await allFutures(pending)
template runWithSignals(vc: ValidatorClientRef, body: untyped): bool =

View File

@ -33,6 +33,9 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
doAssert(validator.index.isSome())
let vindex = validator.index.get()
logScope:
validator = validatorLog(validator)
# TODO: signing_root is recomputed in getAttestationSignature just after,
# but not for locally attached validators.
let signingRoot =
@ -47,8 +50,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
warn "Slashing protection activated for attestation",
attestationData = shortLog(adata),
signingRoot = shortLog(signingRoot),
validator = shortLog(validator),
validator_index = vindex, badVoteDetails = $notSlashable.error
badVoteDetails = $notSlashable.error
return false
let attestation = block:
@ -57,8 +59,7 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
let res = await validator.getAttestationSignature(
fork, vc.beaconGenesis.genesis_validators_root, adata)
if res.isErr():
warn "Unable to sign attestation", validator = shortLog(validator),
error_msg = res.error()
warn "Unable to sign attestation", reason = res.error()
return false
res.get()
except CancelledError as exc:
@ -74,9 +75,11 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
int(duty.data.committee_length), adata, signature).expect(
"data validity checked earlier")
debug "Sending attestation", attestation = shortLog(attestation),
validator = shortLog(validator), validator_index = vindex,
delay = vc.getDelay(adata.slot.attestation_deadline())
logScope:
attestation = shortLog(attestation)
delay = vc.getDelay(adata.slot.attestation_deadline())
debug "Sending attestation"
validator.doppelgangerActivity(attestation.data.slot.epoch)
@ -84,36 +87,23 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
try:
await vc.submitPoolAttestations(@[attestation], ApiStrategyKind.First)
except ValidatorApiError as exc:
warn "Unable to publish attestation",
attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex,
reason = exc.getFailureReason()
warn "Unable to publish attestation", reason = exc.getFailureReason()
return false
except CancelledError as exc:
debug "Attestation publishing process was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while publishing attestation",
attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
let delay = vc.getDelay(adata.slot.attestation_deadline())
if res:
let delay = vc.getDelay(adata.slot.attestation_deadline())
beacon_attestations_sent.inc()
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
notice "Attestation published", attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex,
delay = delay
notice "Attestation published"
else:
warn "Attestation was not accepted by beacon node",
attestation = shortLog(attestation),
validator = shortLog(validator),
validator_index = vindex, delay = delay
warn "Attestation was not accepted by beacon node"
return res
proc serveAggregateAndProof*(service: AttestationServiceRef,
@ -124,21 +114,21 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
vc = service.client
genesisRoot = vc.beaconGenesis.genesis_validators_root
slot = proof.aggregate.data.slot
vindex = validator.index.get()
fork = vc.forkAtEpoch(slot.epoch)
debug "Signing aggregate", validator = shortLog(validator),
attestation = shortLog(proof.aggregate), fork = fork
logScope:
validator = validatorLog(validator)
attestation = shortLog(proof.aggregate)
debug "Signing aggregate", fork = fork
let signature =
try:
let res = await validator.getAggregateAndProofSignature(
fork, genesisRoot, proof)
let res =
await validator.getAggregateAndProofSignature(fork, genesisRoot, proof)
if res.isErr():
warn "Unable to sign aggregate and proof using remote signer",
validator = shortLog(validator),
attestation = shortLog(proof.aggregate),
error_msg = res.error()
reason = res.error()
return false
res.get()
except CancelledError as exc:
@ -146,19 +136,15 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
raise exc
except CatchableError as exc:
error "Unexpected error occured while signing aggregated attestation",
validator = shortLog(validator),
attestation = shortLog(proof.aggregate),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
return false
let signedProof = SignedAggregateAndProof(message: proof,
signature: signature)
logScope:
delay = vc.getDelay(slot.aggregate_deadline())
debug "Sending aggregated attestation", fork = fork,
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator), validator_index = vindex,
delay = vc.getDelay(slot.aggregate_deadline())
debug "Sending aggregated attestation", fork = fork
validator.doppelgangerActivity(proof.aggregate.data.slot.epoch)
@ -167,32 +153,21 @@ proc serveAggregateAndProof*(service: AttestationServiceRef,
await vc.publishAggregateAndProofs(@[signedProof], ApiStrategyKind.First)
except ValidatorApiError as exc:
warn "Unable to publish aggregated attestation",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
validator_index = vindex,
reason = exc.getFailureReason()
reason = exc.getFailureReason()
return false
except CancelledError as exc:
debug "Publish aggregate and proofs request was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while publishing aggregated attestation",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
err_name = exc.name, err_msg = exc.msg
return false
if res:
beacon_aggregates_sent.inc()
notice "Aggregated attestation published",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
validator_index = vindex
notice "Aggregated attestation published"
else:
warn "Aggregated attestation was not accepted by beacon node",
attestation = shortLog(signedProof.message.aggregate),
validator = shortLog(validator),
validator_index = vindex
warn "Aggregated attestation was not accepted by beacon node"
return res
proc produceAndPublishAttestations*(service: AttestationServiceRef,
@ -394,7 +369,7 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
await service.produceAndPublishAggregates(ad, duties)
proc spawnAttestationTasks(service: AttestationServiceRef,
slot: Slot) =
slot: Slot) {.async.} =
let vc = service.client
let dutiesByCommittee =
block:
@ -405,34 +380,67 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
res.mgetOrPut(item.data.committee_index, default).add(item)
res
var dutiesSkipped: seq[string]
for index, duties in dutiesByCommittee:
asyncSpawn service.publishAttestationsAndAggregates(slot, index, duties)
if len(dutiesSkipped) > 0:
info "Doppelganger protection disabled validator duties",
validators = len(dutiesSkipped)
trace "Doppelganger protection disabled validator duties dump",
validators = dutiesSkipped
var tasks: seq[Future[void]]
try:
for index, duties in dutiesByCommittee:
tasks.add(service.publishAttestationsAndAggregates(slot, index, duties))
let timeout = vc.beaconClock.durationToNextSlot()
await allFutures(tasks).wait(timeout)
except AsyncTimeoutError:
# Cancelling all the pending tasks.
let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await allFutures(pending)
except CancelledError as exc:
# Cancelling all the pending tasks.
let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await allFutures(pending)
raise exc
except CatchableError as exc:
error "Unexpected error while processing attestation duties",
error_name = exc.name, error_message = exc.msg
proc mainLoop(service: AttestationServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
debug "Attester loop is waiting for initialization"
try:
await allFutures(
vc.preGenesisEvent.wait(),
vc.genesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
except CancelledError:
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
return
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
var currentSlot: Opt[Slot]
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
let sleepTime =
attestationSlotOffset + vc.beaconClock.durationToNextSlot()
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
service.spawnAttestationTasks(currentSlot)
await sleepAsync(sleepTime)
false
let
# We use zero offset here, because we do waiting in
# waitForBlockPublished(attestationSlotOffset).
slot = await vc.checkedWaitForNextSlot(currentSlot,
ZeroTimeDiff, false)
if slot.isNone():
debug "System time adjusted backwards significantly, exiting"
true
else:
currentSlot = slot
await service.spawnAttestationTasks(currentSlot.get())
false
except CancelledError:
debug "Service interrupted"
true

View File

@ -11,7 +11,10 @@ import
".."/spec/forks,
common, api
logScope: service = "block_service"
const
ServiceName = "block_service"
logScope: service = ServiceName
type
PreparedBeaconBlock = object
@ -300,26 +303,28 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
proposerKey: ValidatorPubKey) {.async.} =
let (inFuture, timeToSleep) = vc.beaconClock.fromNow(slot)
try:
if inFuture:
debug "Proposing block", timeIn = timeToSleep,
validator = shortLog(proposerKey)
await sleepAsync(timeToSleep)
else:
debug "Proposing block", timeIn = 0.seconds,
validator = shortLog(proposerKey)
let
currentSlot = (await vc.checkedWaitForSlot(slot, ZeroTimeDiff,
false)).valueOr:
error "Unable to perform block production because of system time"
return
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
validator = vc.getValidatorForDuties(proposerKey, slot).valueOr: return
await vc.publishBlock(currentSlot, slot, validator)
if currentSlot > slot:
warn "Skip block production for expired slot",
current_slot = currentSlot, duties_slot = slot
return
let validator = vc.getValidatorForDuties(proposerKey, slot).valueOr: return
try:
await vc.publishBlock(currentSlot, slot, validator)
except CancelledError as exc:
debug "Block proposing was interrupted", slot = slot,
validator = shortLog(proposerKey)
debug "Block proposing process was interrupted",
slot = slot, validator = shortLog(proposerKey)
raise exc
except CatchableError as exc:
error "Unexpected error encountered while proposing block",
slot = slot, validator = shortLog(validator)
proc spawnProposalTask(vc: ValidatorClientRef,
duty: RestProposerDuty): ProposerTask =
@ -356,73 +361,72 @@ proc checkDuty(duty: RestProposerDuty, epoch: Epoch, slot: Slot): bool =
proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
dependentRoot: Eth2Digest,
duties: openArray[RestProposerDuty]) =
let default = ProposedData(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
epochDuties = vc.proposers.getOrDefault(epoch, default)
if not(epochDuties.isDefault()):
if epochDuties.dependentRoot != dependentRoot:
warn "Proposer duties re-organization", duties_count = len(duties),
wall_slot = currentSlot, epoch = epoch,
prior_dependent_root = epochDuties.dependentRoot,
dependent_root = dependentRoot, wall_slot = currentSlot
let tasks =
block:
var res: seq[ProposerTask]
var hashset = initHashSet[Slot]()
let
default = ProposedData(epoch: FAR_FUTURE_EPOCH)
currentSlot = vc.getCurrentSlot().get(Slot(0))
epochDuties = vc.proposers.getOrDefault(epoch, default)
for task in epochDuties.duties:
if task notin duties:
# Task is no more relevant, so cancel it.
debug "Cancelling running proposal duty task",
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
task.future.cancel()
else:
# If task is already running for proper slot, we keep it alive.
debug "Keep running previous proposal duty task",
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
res.add(task)
for duty in duties:
if duty notin res:
debug "New proposal duty received", slot = duty.slot,
validator = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
warn "Multiple block proposers for this slot, " &
"producing blocks for all proposers", slot = duty.slot
else:
hashset.incl(duty.slot)
res.add(task)
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
else:
debug "New block proposal duties received",
dependent_root = dependentRoot, duties_count = len(duties),
wall_slot = currentSlot, epoch = epoch
# Spawn new proposer tasks and modify proposers map.
if not(epochDuties.isDefault()):
if epochDuties.dependentRoot != dependentRoot:
warn "Proposer duties re-organization", duties_count = len(duties),
wall_slot = currentSlot, epoch = epoch,
prior_dependent_root = epochDuties.dependentRoot,
dependent_root = dependentRoot
let tasks =
block:
var hashset = initHashSet[Slot]()
var res: seq[ProposerTask]
for duty in duties:
debug "New proposal duty received", slot = duty.slot,
validator = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
warn "Multiple block proposers for this slot, " &
"producing blocks for all proposers", slot = duty.slot
else:
hashset.incl(duty.slot)
var hashset = initHashSet[Slot]()
for task in epochDuties.duties:
if task notin duties:
# Task is no more relevant, so cancel it.
debug "Cancelling running proposal duty task",
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
task.future.cancel()
else:
# If task is already running for proper slot, we keep it alive.
debug "Keep running previous proposal duty task",
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
res.add(task)
for duty in duties:
if duty notin res:
debug "New proposal duty received", slot = duty.slot,
validator = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
error "Multiple block proposers for this slot, " &
"producing blocks for all proposers", slot = duty.slot
else:
hashset.incl(duty.slot)
res.add(task)
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
else:
debug "New block proposal duties received",
dependent_root = dependentRoot, duties_count = len(duties),
wall_slot = currentSlot, epoch = epoch
# Spawn new proposer tasks and modify proposers map.
let tasks =
block:
var hashset = initHashSet[Slot]()
var res: seq[ProposerTask]
for duty in duties:
debug "New proposal duty received", slot = duty.slot,
validator = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
error "Multiple block proposers for this slot, " &
"producing blocks for all proposers", slot = duty.slot
else:
hashset.incl(duty.slot)
res.add(task)
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
proc waitForBlockPublished*(vc: ValidatorClientRef,
slot: Slot, timediff: TimeDiff) {.async.} =
@ -439,24 +443,100 @@ proc waitForBlockPublished*(vc: ValidatorClientRef,
if not(task.future.finished()):
res.add(task.future)
res
waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
start_time = startTime
pending_tasks = len(pendingTasks)
slot = slot
timediff = timediff
if len(pendingTasks) > 0:
let waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
wait_time = waitTime
if waitTime.nanoseconds > 0'i64:
# TODO (cheatfate): This algorithm should be tuned, when we will have ability
# to monitor block proposals which are not created by validators bundled with
# VC.
logScope: wait_time = waitTime
if waitTime.nanoseconds > 0'i64:
if len(pendingTasks) > 0:
# Block proposal pending
try:
await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds))
trace "Block proposal awaited"
# The expected block arrived - in our async loop however, we might
# have been doing other processing that caused delays here so we'll
# cap the waiting to the time when we would have sent out attestations
# had the block not arrived. An opposite case is that we received
# (or produced) a block that has not yet reached our neighbours. To
# protect against our attestations being dropped (because the others
# have not yet seen the block), we'll impose a minimum delay of
# 2000ms. The delay is enforced only when we're not hitting the
# "normal" cutoff time for sending out attestations. An earlier delay
# of 250ms has proven to be not enough, increasing the risk of losing
# attestations, and with growing block sizes, 1000ms started to be
# risky as well. Regardless, because we "just" received the block,
# we'll impose the delay.
# Take into consideration chains with a different slot time
const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2)
let
afterBlockTime = vc.beaconClock.now() + afterBlockDelay
afterBlockCutoff = vc.beaconClock.fromNow(
min(afterBlockTime,
slot.attestation_deadline() + afterBlockDelay))
if afterBlockCutoff.inFuture:
debug "Got block, waiting to send attestations",
after_block_cutoff = shortLog(afterBlockCutoff.offset)
await sleepAsync(afterBlockCutoff.offset)
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not published in time", duration = dur
else:
# No pending block proposals.
try:
await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds))
trace "Block proposal awaited"
await sleepAsync(nanoseconds(waitTime.nanoseconds))
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except AsyncTimeoutError:
except CatchableError as exc:
let dur = Moment.now() - startTime
debug "Block was not published in time", duration = dur
error "Unexpected error occured while waiting for block publication",
err_name = exc.name, err_msg = exc.msg, duration = dur
return
proc mainLoop(service: BlockServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
var future = newFuture[void]()
try:
# Future is not going to be completed, so the only way to exit, is to
# cancel it.
await future
except CancelledError as exc:
debug "Service interrupted"
except CatchableError as exc:
error "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
# We going to cleanup all the pending proposer tasks.
var res: seq[Future[void]]
for epoch, data in vc.proposers.pairs():
for duty in data.duties.items():
if not(duty.future.finished()):
res.add(duty.future.cancelAndWait())
await allFutures(res)
proc init*(t: typedesc[BlockServiceRef],
vc: ValidatorClientRef): Future[BlockServiceRef] {.async.} =
logScope: service = ServiceName
var res = BlockServiceRef(name: ServiceName, client: vc,
state: ServiceState.Initialized)
debug "Initializing service"
return res
proc start*(service: BlockServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -39,14 +39,12 @@ const
DelayBuckets* = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf]
ZeroTimeDiff* = TimeDiff(nanoseconds: 0'i64)
type
ServiceState* {.pure.} = enum
Initialized, Running, Error, Closing, Closed
BlockServiceEventRef* = ref object of RootObj
slot*: Slot
proposers*: seq[ValidatorPubKey]
RegistrationKind* {.pure.} = enum
Cached, IncorrectTime, MissingIndex, MissingFee, MissingGasLimit
ErrorSignature, NoSignature
@ -174,6 +172,8 @@ type
beaconClock*: BeaconClock
attachedValidators*: ref ValidatorPool
forks*: seq[Fork]
preGenesisEvent*: AsyncEvent
genesisEvent*: AsyncEvent
forksAvailable*: AsyncEvent
nodesAvailable*: AsyncEvent
indicesAvailable*: AsyncEvent
@ -201,7 +201,7 @@ type
data*: seq[ApiNodeFailure]
const
DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
DefaultDutyAndProof* = DutyAndProof(epoch: FAR_FUTURE_EPOCH)
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
AllBeaconNodeRoles* = {
@ -323,12 +323,21 @@ proc `$`*(bn: BeaconNodeServerRef): string =
bn.logIdent
proc validatorLog*(key: ValidatorPubKey,
index: ValidatorIndex): string =
index: ValidatorIndex): string =
var res = shortLog(key)
res.add('@')
res.add(Base10.toString(uint64(index)))
res
proc validatorLog*(validator: AttachedValidator): string =
var res = shortLog(validator)
res.add('@')
if validator.index.isSome():
res.add(Base10.toString(uint64(validator.index.get())))
else:
res.add("<missing>")
res
chronicles.expandIt(BeaconNodeServerRef):
node = $it
node_index = it.index
@ -564,18 +573,12 @@ proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest,
data: openArray[ProposerTask]): ProposedData =
ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data)
proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] =
let
wallTime = vc.beaconClock.now()
wallSlot = wallTime.toSlot()
if not(wallSlot.afterGenesis):
let checkGenesisTime = vc.beaconClock.fromNow(start_beacon_time(Slot(0)))
warn "Jump in time detected, something wrong with wallclock",
wall_time = wallTime, genesisIn = checkGenesisTime.offset
none[Slot]()
proc getCurrentSlot*(vc: ValidatorClientRef): Opt[Slot] =
let res = vc.beaconClock.now().toSlot()
if res.afterGenesis:
Opt.some(res.slot)
else:
some(wallSlot.slot)
Opt.none(Slot)
proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[DutyAndProof] =
@ -915,3 +918,75 @@ proc prepareRegistrationList*(
proc init*(t: typedesc[ApiNodeFailure], node: BeaconNodeServerRef,
failure: ApiFailure): ApiNodeFailure =
ApiNodeFailure(node: node, failure: failure)
proc checkedWaitForSlot*(vc: ValidatorClientRef, destinationSlot: Slot,
offset: TimeDiff,
showLogs: bool): Future[Opt[Slot]] {.async.} =
let
currentTime = vc.beaconClock.now()
currentSlot = currentTime.slotOrZero()
chronosOffset = chronos.nanoseconds(
if offset.nanoseconds < 0: 0'i64 else: offset.nanoseconds)
var timeToSlot = (destinationSlot.start_beacon_time() - currentTime) +
chronosOffset
logScope:
start_time = shortLog(currentTime)
start_slot = shortLog(currentSlot)
dest_slot = shortLog(destinationSlot)
time_to_slot = shortLog(timeToSlot)
while true:
await sleepAsync(timeToSlot)
let
wallTime = vc.beaconClock.now()
wallSlot = wallTime.slotOrZero()
logScope:
wall_time = shortLog(wallTime)
wall_slot = shortLog(wallSlot)
if wallSlot < destinationSlot:
# While we were sleeping, the system clock changed and time moved
# backwards!
if wallSlot + 1 < destinationSlot:
# This is a critical condition where it's hard to reason about what
# to do next - we'll call the attention of the user here by shutting
# down.
if showLogs:
fatal "System time adjusted backwards significantly - " &
"clock may be inaccurate - shutting down"
return Opt.none(Slot)
else:
# Time moved back by a single slot - this could be a minor adjustment,
# for example when NTP does its thing after not working for a while
timeToSlot = destinationSlot.start_beacon_time() - wallTime +
chronosOffset
if showLogs:
warn "System time adjusted backwards, rescheduling slot actions"
continue
elif wallSlot > destinationSlot + SLOTS_PER_EPOCH:
if showLogs:
warn "Time moved forwards by more than an epoch, skipping ahead"
return Opt.some(wallSlot)
elif wallSlot > destinationSlot:
if showLogs:
notice "Missed expected slot start, catching up"
return Opt.some(wallSlot)
else:
return Opt.some(destinationSlot)
proc checkedWaitForNextSlot*(vc: ValidatorClientRef, curSlot: Opt[Slot],
offset: TimeDiff,
showLogs: bool): Future[Opt[Slot]] =
let
currentTime = vc.beaconClock.now()
currentSlot = curSlot.valueOr: currentTime.slotOrZero()
nextSlot = currentSlot + 1
vc.checkedWaitForSlot(nextSlot, offset, showLogs)

View File

@ -58,6 +58,21 @@ proc mainLoop(service: DoppelgangerServiceRef) {.async.} =
debug "Service disabled because of configuration settings"
return
debug "Doppelganger detection loop is waiting for initialization"
try:
await allFutures(
vc.preGenesisEvent.wait(),
vc.genesisEvent.wait(),
vc.indicesAvailable.wait()
)
except CancelledError:
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
return
# On (re)start, we skip the remainder of the epoch before we start monitoring
# for doppelgangers so we don't trigger on the attestations we produced before
# the epoch - there's no activity in the genesis slot, so if we start at or

View File

@ -38,7 +38,9 @@ proc checkDuty(duty: RestAttesterDuty): bool =
proc checkSyncDuty(duty: RestSyncCommitteeDuty): bool =
uint64(duty.validator_index) <= VALIDATOR_REGISTRY_LIMIT
proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
proc pollForValidatorIndices*(service: DutiesServiceRef) {.async.} =
let vc = service.client
let validatorIdents =
block:
var res: seq[ValidatorIdent]
@ -107,16 +109,12 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
updated_validators = updated
vc.indicesAvailable.fire()
proc pollForAttesterDuties*(vc: ValidatorClientRef,
proc pollForAttesterDuties*(service: DutiesServiceRef,
epoch: Epoch): Future[int] {.async.} =
let validatorIndices =
block:
var res: seq[ValidatorIndex]
for index in vc.attachedValidators[].indices():
res.add(index)
res
let vc = service.client
let validatorIndices = toSeq(vc.attachedValidators[].indices())
if validatorIndices.len == 0:
if len(validatorIndices) == 0:
return 0
var duties: seq[RestAttesterDuty]
@ -243,7 +241,8 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
return len(addOrReplaceItems)
proc pruneSyncCommitteeDuties*(vc: ValidatorClientRef, slot: Slot) =
proc pruneSyncCommitteeDuties*(service: DutiesServiceRef, slot: Slot) =
let vc = service.client
if slot.is_sync_committee_period():
var newSyncCommitteeDuties: SyncCommitteeDutiesMap
let epoch = slot.epoch()
@ -255,8 +254,9 @@ proc pruneSyncCommitteeDuties*(vc: ValidatorClientRef, slot: Slot) =
newSyncCommitteeDuties[key] = currentPeriodDuties
vc.syncCommitteeDuties = newSyncCommitteeDuties
proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
proc pollForSyncCommitteeDuties*(service: DutiesServiceRef,
epoch: Epoch): Future[int] {.async.} =
let vc = service.client
let validatorIndices = toSeq(vc.attachedValidators[].indices())
var
filteredDuties: seq[RestSyncCommitteeDuty]
@ -335,7 +335,8 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
return len(addOrReplaceItems)
proc pruneAttesterDuties(vc: ValidatorClientRef, epoch: Epoch) =
proc pruneAttesterDuties(service: DutiesServiceRef, epoch: Epoch) =
let vc = service.client
var attesters: AttesterMap
for key, item in vc.attesters:
var v = EpochDuties()
@ -348,7 +349,7 @@ proc pruneAttesterDuties(vc: ValidatorClientRef, epoch: Epoch) =
attesters[key] = v
vc.attesters = attesters
proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} =
proc pollForAttesterDuties*(service: DutiesServiceRef) {.async.} =
## Query the beacon node for attestation duties for all known validators.
##
## This function will perform (in the following order):
@ -356,125 +357,127 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} =
## 1. Poll for current-epoch duties and update the local `attesters` map.
## 2. Poll for next-epoch duties and update the local `attesters` map.
## 3. Push out any attestation subnet subscriptions to the BN.
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
nextEpoch = currentEpoch + 1'u64
let vc = service.client
let
currentSlot = vc.getCurrentSlot().get(Slot(0))
currentEpoch = currentSlot.epoch()
nextEpoch = currentEpoch + 1'u64
if vc.attachedValidators[].count() != 0:
var counts: array[2, tuple[epoch: Epoch, count: int]]
counts[0] = (currentEpoch, await vc.pollForAttesterDuties(currentEpoch))
counts[1] = (nextEpoch, await vc.pollForAttesterDuties(nextEpoch))
if vc.attachedValidators[].count() != 0:
var counts: array[2, tuple[epoch: Epoch, count: int]]
counts[0] = (currentEpoch,
await service.pollForAttesterDuties(currentEpoch))
counts[1] = (nextEpoch,
await service.pollForAttesterDuties(nextEpoch))
if (counts[0].count == 0) and (counts[1].count == 0):
debug "No new attester's duties received", slot = currentSlot
if (counts[0].count == 0) and (counts[1].count == 0):
debug "No new attester's duties received", slot = currentSlot
let subscriptions =
block:
var res: seq[RestCommitteeSubscription]
for item in counts:
if item.count > 0:
for duty in vc.attesterDutiesForEpoch(item.epoch):
if currentSlot + SUBSCRIPTION_BUFFER_SLOTS < duty.data.slot:
let isAggregator =
if duty.slotSig.isSome():
is_aggregator(duty.data.committee_length,
duty.slotSig.get())
else:
false
let sub = RestCommitteeSubscription(
validator_index: duty.data.validator_index,
committee_index: duty.data.committee_index,
committees_at_slot: duty.data.committees_at_slot,
slot: duty.data.slot,
is_aggregator: isAggregator
)
res.add(sub)
res
if len(subscriptions) > 0:
let res = await vc.prepareBeaconCommitteeSubnet(subscriptions)
if res == 0:
warn "Failed to subscribe validators to beacon committee subnets",
slot = currentSlot, epoch = currentEpoch,
subscriptions_count = len(subscriptions)
vc.pruneAttesterDuties(currentEpoch)
proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} =
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
if vc.attachedValidators[].count() != 0:
let
dutyPeriods =
block:
var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod]]
let
currentPeriod = currentSlot.sync_committee_period()
lookaheadSlot = currentSlot +
SUBSCRIPTION_LOOKAHEAD_EPOCHS * SLOTS_PER_EPOCH
lookaheadPeriod = lookaheadSlot.sync_committee_period()
res.add(
(epoch: currentSlot.epoch(),
period: currentPeriod)
)
if lookAheadPeriod > currentPeriod:
res.add(
(epoch: lookaheadPeriod.start_epoch(),
period: lookAheadPeriod)
)
res
(counts, total) =
block:
var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod,
count: int]]
var total = 0
if len(dutyPeriods) > 0:
for (epoch, period) in dutyPeriods:
let count = await vc.pollForSyncCommitteeDuties(epoch)
res.add((epoch: epoch, period: period, count: count))
total += count
(res, total)
if total == 0:
debug "No new sync committee member's duties received",
slot = currentSlot
let subscriptions =
block:
var res: seq[RestSyncCommitteeSubscription]
for item in counts:
if item.count > 0:
let untilEpoch = start_epoch(item.period + 1'u64)
let subscriptionsInfo =
vc.syncMembersSubscriptionInfoForEpoch(item.epoch)
for subInfo in subscriptionsInfo:
let sub = RestSyncCommitteeSubscription(
validator_index: subInfo.validator_index,
sync_committee_indices:
subInfo.validator_sync_committee_indices,
until_epoch: untilEpoch
let subscriptions =
block:
var res: seq[RestCommitteeSubscription]
for item in counts:
if item.count > 0:
for duty in vc.attesterDutiesForEpoch(item.epoch):
if currentSlot + SUBSCRIPTION_BUFFER_SLOTS < duty.data.slot:
let isAggregator =
if duty.slotSig.isSome():
is_aggregator(duty.data.committee_length,
duty.slotSig.get())
else:
false
let sub = RestCommitteeSubscription(
validator_index: duty.data.validator_index,
committee_index: duty.data.committee_index,
committees_at_slot: duty.data.committees_at_slot,
slot: duty.data.slot,
is_aggregator: isAggregator
)
res.add(sub)
res
if len(subscriptions) > 0:
let res = await vc.prepareBeaconCommitteeSubnet(subscriptions)
if res == 0:
warn "Failed to subscribe validators to beacon committee subnets",
slot = currentSlot, epoch = currentEpoch,
subscriptions_count = len(subscriptions)
service.pruneAttesterDuties(currentEpoch)
proc pollForSyncCommitteeDuties*(service: DutiesServiceRef) {.async.} =
let vc = service.client
let
currentSlot = vc.getCurrentSlot().get(Slot(0))
currentEpoch = currentSlot.epoch()
if vc.attachedValidators[].count() != 0:
let
dutyPeriods =
block:
var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod]]
let
currentPeriod = currentSlot.sync_committee_period()
lookaheadSlot = currentSlot +
SUBSCRIPTION_LOOKAHEAD_EPOCHS * SLOTS_PER_EPOCH
lookaheadPeriod = lookaheadSlot.sync_committee_period()
res.add(
(epoch: currentSlot.epoch(),
period: currentPeriod)
)
if lookAheadPeriod > currentPeriod:
res.add(
(epoch: lookaheadPeriod.start_epoch(),
period: lookAheadPeriod)
)
res
if len(subscriptions) > 0:
let res = await vc.prepareSyncCommitteeSubnets(subscriptions)
if res != 0:
warn "Failed to subscribe validators to sync committee subnets",
slot = currentSlot, epoch = currentEpoch,
subscriptions_count = len(subscriptions)
(counts, total) =
block:
var res: seq[tuple[epoch: Epoch, period: SyncCommitteePeriod,
count: int]]
var total = 0
if len(dutyPeriods) > 0:
for (epoch, period) in dutyPeriods:
let count = await service.pollForSyncCommitteeDuties(epoch)
res.add((epoch: epoch, period: period, count: count))
total += count
(res, total)
vc.pruneSyncCommitteeDuties(currentSlot)
if total == 0:
debug "No new sync committee member's duties received",
slot = currentSlot
let subscriptions =
block:
var res: seq[RestSyncCommitteeSubscription]
for item in counts:
if item.count > 0:
let untilEpoch = start_epoch(item.period + 1'u64)
let subscriptionsInfo =
vc.syncMembersSubscriptionInfoForEpoch(item.epoch)
for subInfo in subscriptionsInfo:
let sub = RestSyncCommitteeSubscription(
validator_index: subInfo.validator_index,
sync_committee_indices:
subInfo.validator_sync_committee_indices,
until_epoch: untilEpoch
)
res.add(sub)
res
if len(subscriptions) > 0:
let res = await vc.prepareSyncCommitteeSubnets(subscriptions)
if res == 0:
warn "Failed to subscribe validators to sync committee subnets",
slot = currentSlot, epoch = currentEpoch,
subscriptions_count = len(subscriptions)
service.pruneSyncCommitteeDuties(currentSlot)
proc pruneBeaconProposers(service: DutiesServiceRef, epoch: Epoch) =
let vc = service.client
proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) =
var proposers: ProposerMap
for epochKey, data in vc.proposers:
if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch:
@ -484,94 +487,88 @@ proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) =
loop = ProposerLoop
vc.proposers = proposers
proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
proc pollForBeaconProposers*(service: DutiesServiceRef) {.async.} =
let vc = service.client
let
currentSlot = vc.getCurrentSlot().get(Slot(0))
currentEpoch = currentSlot.epoch()
if vc.attachedValidators[].count() != 0:
try:
let res = await vc.getProposerDuties(currentEpoch,
ApiStrategyKind.First)
let
dependentRoot = res.dependent_root
duties = res.data
relevantDuties = duties.filterIt(it.pubkey in vc.attachedValidators[])
if vc.attachedValidators[].count() != 0:
try:
let res = await vc.getProposerDuties(currentEpoch,
ApiStrategyKind.First)
let
dependentRoot = res.dependent_root
duties = res.data
relevantDuties = duties.filterIt(it.pubkey in vc.attachedValidators[])
if len(relevantDuties) > 0:
vc.addOrReplaceProposers(currentEpoch, dependentRoot, relevantDuties)
else:
debug "No relevant proposer duties received", slot = currentSlot,
duties_count = len(duties)
except ValidatorApiError as exc:
warn "Unable to get proposer duties", slot = currentSlot,
if len(relevantDuties) > 0:
vc.addOrReplaceProposers(currentEpoch, dependentRoot, relevantDuties)
else:
debug "No relevant proposer duties received", slot = currentSlot,
duties_count = len(duties)
except ValidatorApiError as exc:
notice "Unable to get proposer duties", slot = currentSlot,
epoch = currentEpoch, reason = exc.getFailureReason()
except CancelledError as exc:
debug "Proposer duties processing was interrupted"
raise exc
except CatchableError as exc:
debug "Unexpected error occured while getting proposer duties",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
except CancelledError as exc:
debug "Proposer duties processing was interrupted"
raise exc
except CatchableError as exc:
debug "Unexpected error occured while getting proposer duties",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
vc.pruneBeaconProposers(currentEpoch)
service.pruneBeaconProposers(currentEpoch)
proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} =
let vc = service.client
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
proposers = vc.prepareProposersList(currentEpoch)
let
currentSlot = vc.getCurrentSlot().get(Slot(0))
currentEpoch = currentSlot.epoch()
proposers = vc.prepareProposersList(currentEpoch)
if len(proposers) > 0:
let count =
try:
await prepareBeaconProposer(vc, proposers)
except ValidatorApiError as exc:
warn "Unable to prepare beacon proposers", slot = currentSlot,
epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg, reason = exc.getFailureReason()
0
except CancelledError as exc:
debug "Beacon proposer preparation processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while preparing beacon proposers",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
0
debug "Beacon proposers prepared",
validators_count = vc.attachedValidators[].count(),
proposers_count = len(proposers),
prepared_count = count
if len(proposers) > 0:
let count =
try:
await prepareBeaconProposer(vc, proposers)
except ValidatorApiError as exc:
warn "Unable to prepare beacon proposers", slot = currentSlot,
epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg, reason = exc.getFailureReason()
0
except CancelledError as exc:
debug "Beacon proposer preparation processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while preparing beacon proposers",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
0
debug "Beacon proposers prepared",
validators_count = vc.attachedValidators[].count(),
proposers_count = len(proposers),
prepared_count = count
proc registerValidators*(service: DutiesServiceRef) {.async.} =
let vc = service.client
let sres = vc.getCurrentSlot()
let
currentSlot = vc.getCurrentSlot().get(Slot(0))
genesisFork = vc.forks[0]
registrations =
try:
await vc.prepareRegistrationList(getTime(), genesisFork)
except CancelledError as exc:
debug "Validator registration preparation was interrupted",
slot = currentSlot, fork = genesisFork
raise exc
except CatchableError as exc:
var default: seq[SignedValidatorRegistrationV1]
error "Unexpected error occured while preparing validators " &
"registration data", slot = currentSlot, fork = genesisFork,
err_name = exc.name, err_msg = exc.msg
default
var default: seq[SignedValidatorRegistrationV1]
if sres.isSome():
let
genesisFork = vc.forks[0]
currentSlot = sres.get()
registrations =
try:
await vc.prepareRegistrationList(getTime(), genesisFork)
except CancelledError as exc:
debug "Validator registration preparation was interrupted",
slot = currentSlot, fork = genesisFork
raise exc
except CatchableError as exc:
error "Unexpected error occured while preparing validators " &
"registration data", slot = currentSlot, fork = genesisFork,
err_name = exc.name, err_msg = exc.msg
default
let count =
count =
if len(registrations) > 0:
try:
await registerValidator(vc, registrations)
@ -592,10 +589,10 @@ proc registerValidators*(service: DutiesServiceRef) {.async.} =
else:
0
if count > 0:
debug "Validators registered", slot = currentSlot,
beacon_nodes_count = count, registrations = len(registrations),
validators_count = vc.attachedValidators[].count()
if count > 0:
debug "Validators registered", slot = currentSlot,
beacon_nodes_count = count, registrations = len(registrations),
validators_count = vc.attachedValidators[].count()
proc waitForNextSlot(service: DutiesServiceRef,
serviceLoop: DutiesServiceLoop) {.async.} =
@ -605,35 +602,45 @@ proc waitForNextSlot(service: DutiesServiceRef,
proc attesterDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Attester duties loop waiting for fork schedule update"
await vc.forksAvailable.wait()
debug "Attester duties loop is waiting for initialization"
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
await vc.pollForAttesterDuties()
await service.pollForAttesterDuties()
await service.waitForNextSlot(AttesterLoop)
proc proposerDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Proposer duties loop waiting for fork schedule update"
await vc.forksAvailable.wait()
debug "Proposer duties loop is waiting for initialization"
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
await vc.pollForBeaconProposers()
await service.pollForBeaconProposers()
await service.waitForNextSlot(ProposerLoop)
proc validatorIndexLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Validator indices loop is waiting for initialization"
await vc.preGenesisEvent.wait()
while true:
await vc.pollForValidatorIndices()
await service.pollForValidatorIndices()
await service.waitForNextSlot(IndicesLoop)
proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Beacon proposer preparation loop waiting for validator indices update"
await vc.indicesAvailable.wait()
debug "Beacon proposer preparation loop is waiting for initialization"
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait()
)
while true:
await service.prepareBeaconProposers()
await service.waitForNextSlot(ProposerPreparationLoop)
@ -641,21 +648,28 @@ proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} =
proc validatorRegisterLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
doAssert(vc.config.payloadBuilderEnable)
debug "Validator registration loop is waiting for initialization"
await allFutures(vc.indicesAvailable.wait(), vc.forksAvailable.wait())
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
await service.registerValidators()
await service.waitForNextSlot(ValidatorRegisterLoop)
proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Sync committee duties loop waiting for fork schedule update"
await vc.forksAvailable.wait()
debug "Sync committee duties loop is waiting for initialization"
await allFutures(
vc.preGenesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
while true:
await vc.pollForSyncCommitteeDuties()
await service.pollForSyncCommitteeDuties()
await service.waitForNextSlot(SyncCommitteeLoop)
template checkAndRestart(serviceLoop: DutiesServiceLoop,
@ -696,9 +710,13 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
var futures = @[FutureBase(attestFut), FutureBase(proposeFut),
FutureBase(indicesFut), FutureBase(syncFut),
FutureBase(prepareFut)]
var futures = @[
FutureBase(attestFut),
FutureBase(proposeFut),
FutureBase(indicesFut),
FutureBase(syncFut),
FutureBase(prepareFut)
]
if not(isNil(registerFut)): futures.add(FutureBase(registerFut))
discard await race(futures)
checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop())
@ -743,8 +761,6 @@ proc init*(t: typedesc[DutiesServiceRef],
let res = DutiesServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized)
debug "Initializing service"
# We query for indices first, to avoid empty queries for duties.
await vc.pollForValidatorIndices()
return res
proc start*(service: DutiesServiceRef) =

View File

@ -34,6 +34,10 @@ proc otherNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] =
proc otherNodesCount*(vc: ValidatorClientRef): int =
vc.beaconNodes.countIt(it.status != RestBeaconNodeStatus.Synced)
proc preGenesisNodes*(vc: ValidatorClientRef): seq[BeaconNodeServerRef] =
vc.beaconNodes.filterIt(it.status notin {RestBeaconNodeStatus.Synced,
RestBeaconNodeStatus.OptSynced})
proc waitNodes*(vc: ValidatorClientRef, timeoutFut: Future[void],
statuses: set[RestBeaconNodeStatus],
roles: set[BeaconNodeRole], waitChanges: bool) {.async.} =
@ -230,7 +234,12 @@ proc checkNode(vc: ValidatorClientRef,
proc checkNodes*(service: FallbackServiceRef): Future[bool] {.async.} =
let
nodesToCheck = service.client.otherNodes()
vc = service.client
nodesToCheck =
if vc.genesisEvent.isSet():
service.client.otherNodes()
else:
service.client.preGenesisNodes()
pendingChecks = nodesToCheck.mapIt(service.client.checkNode(it))
var res = false
try:
@ -252,6 +261,16 @@ proc mainLoop(service: FallbackServiceRef) {.async.} =
service.state = ServiceState.Running
debug "Service started"
try:
await vc.preGenesisEvent.wait()
except CancelledError:
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
return
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
@ -279,8 +298,6 @@ proc init*(t: typedesc[FallbackServiceRef],
state: ServiceState.Initialized,
changesEvent: newAsyncEvent())
debug "Initializing service"
# Perform initial nodes check.
if await res.checkNodes(): res.changesEvent.fire()
return res
proc start*(service: FallbackServiceRef) =

View File

@ -43,39 +43,33 @@ proc sortForks(forks: openArray[Fork]): Result[seq[Fork], cstring] {.
ok(sortedForks)
proc pollForFork(vc: ValidatorClientRef) {.async.} =
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
let forks =
try:
await vc.getForkSchedule(ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to retrieve fork schedule",
reason = exc.getFailureReason(), err_msg = exc.msg
return
except CancelledError as exc:
debug "Fork retrieval process was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while getting fork information",
err_name = exc.name, err_msg = exc.msg
return
let forks =
try:
await vc.getForkSchedule(ApiStrategyKind.Best)
except ValidatorApiError as exc:
warn "Unable to retrieve fork schedule",
reason = exc.getFailureReason(), err_msg = exc.msg
return
except CancelledError as exc:
debug "Fork retrieval process was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while getting fork information",
err_name = exc.name, err_msg = exc.msg
let sortedForks =
block:
let res = sortForks(forks)
if res.isErr():
warn "Invalid fork schedule received", reason = res.error()
return
res.get()
let sortedForks =
block:
let res = sortForks(forks)
if res.isErr():
warn "Invalid fork schedule received", reason = res.error()
return
res.get()
if (len(vc.forks) == 0) or (vc.forks != sortedForks):
vc.forks = sortedForks
notice "Fork schedule updated", fork_schedule = sortedForks
vc.forksAvailable.fire()
if (len(vc.forks) == 0) or (vc.forks != sortedForks):
vc.forks = sortedForks
notice "Fork schedule updated", fork_schedule = sortedForks
vc.forksAvailable.fire()
proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
let vc = service.client
@ -88,6 +82,16 @@ proc mainLoop(service: ForkServiceRef) {.async.} =
service.state = ServiceState.Running
debug "Service started"
try:
await vc.preGenesisEvent.wait()
except CancelledError:
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
return
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
@ -114,7 +118,6 @@ proc init*(t: typedesc[ForkServiceRef],
let res = ForkServiceRef(name: ServiceName,
client: vc, state: ServiceState.Initialized)
debug "Initializing service"
await vc.pollForFork()
return res
proc start*(service: ForkServiceRef) =

View File

@ -406,36 +406,68 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
debug "Producing contribution and proofs", delay = delay
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) =
proc processSyncCommitteeTasks(service: SyncCommitteeServiceRef,
slot: Slot) {.async.} =
let
vc = service.client
duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
timeout = vc.beaconClock.durationToNextSlot()
asyncSpawn service.publishSyncMessagesAndContributions(slot, duties)
try:
await service.publishSyncMessagesAndContributions(slot,
duties).wait(timeout)
except AsyncTimeoutError:
warn "Unable to publish sync committee messages and contributions in time",
slot = slot, timeout = timeout
except CancelledError as exc:
debug "Sync committee publish task has been interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error encountered while processing sync committee tasks",
error_name = exc.name, error_message = exc.msg
proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
debug "Sync committee duties loop waiting for fork schedule update"
await vc.forksAvailable.wait()
debug "Sync committee processing loop is waiting for initialization"
try:
await allFutures(
vc.preGenesisEvent.wait(),
vc.genesisEvent.wait(),
vc.indicesAvailable.wait(),
vc.forksAvailable.wait()
)
except CancelledError:
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
return
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
var currentSlot: Opt[Slot]
while true:
# This loop could look much more nicer/better, when
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
let sleepTime =
syncCommitteeMessageSlotOffset + vc.beaconClock.durationToNextSlot()
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
service.spawnSyncCommitteeTasks(currentSlot)
await sleepAsync(sleepTime)
false
let
# We use zero offset here, because we do waiting in
# waitForBlockPublished(syncCommitteeMessageSlotOffset).
slot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff,
false)
if slot.isNone():
debug "System time adjusted backwards significantly, exiting"
true
else:
currentSlot = slot
await service.processSyncCommitteeTasks(currentSlot.get())
false
except CancelledError:
debug "Service interrupted"
true