nimbus-eth2/beacon_chain/nimbus_validator_client.nim

346 lines
14 KiB
Nim
Raw Normal View History

# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
# Standard library
std/[os, random, strutils],
# Nimble packages
2020-07-14 01:01:23 +00:00
stew/shims/[tables, macros],
chronos, confutils, metrics,
chronicles,
json_serialization/std/[options, net],
# Local modules
./spec/[datatypes, digest, crypto, helpers, network, signatures],
./spec/eth2_apis/beacon_rpc_client,
./sync/sync_manager,
"."/[conf, beacon_clock, version],
./networking/[eth2_network, eth2_discovery],
./beacon_node_types,
./nimbus_binary_common,
./ssz/merkleization,
./spec/eth2_apis/callsigs_types,
./validators/[attestation_aggregation, keystore_management, validator_pool, slashing_protection],
./eth/db/[kvstore, kvstore_sqlite3],
./eth/keys, ./eth/p2p/discoveryv5/random2
logScope: topics = "vc"
type
ValidatorClient = ref object
config: ValidatorClientConf
graffitiBytes: GraffitiBytes
client: RpcHttpClient
beaconClock: BeaconClock
attachedValidators: ValidatorPool
fork: Fork
proposalsForCurrentEpoch: Table[Slot, ValidatorPubKey]
attestationsForEpoch: Table[Epoch, Table[Slot, seq[AttesterDuties]]]
beaconGenesis: BeaconGenesisTuple
template attemptUntilSuccess(vc: ValidatorClient, body: untyped) =
while true:
try:
body
break
except CatchableError as err:
warn "Caught an unexpected error", err = err.msg
waitFor sleepAsync(chronos.seconds(vc.config.retryDelay))
proc getValidatorDutiesForEpoch(vc: ValidatorClient, epoch: Epoch) {.gcsafe, async.} =
info "Getting validator duties for epoch", epoch = epoch
let proposals = await vc.client.get_v1_validator_duties_proposer(epoch)
# update the block proposal duties this VC should do during this epoch
vc.proposalsForCurrentEpoch.clear()
for curr in proposals:
if vc.attachedValidators.validators.contains curr.public_key:
vc.proposalsForCurrentEpoch.add(curr.slot, curr.public_key)
# couldn't use mapIt in ANY shape or form so reverting to raw loops - sorry Sean Parent :|
var validatorPubkeys: seq[ValidatorPubKey]
for key in vc.attachedValidators.validators.keys:
validatorPubkeys.add key
proc getAttesterDutiesForEpoch(epoch: Epoch) {.gcsafe, async.} =
# make sure there's an entry
if not vc.attestationsForEpoch.contains epoch:
vc.attestationsForEpoch.add(epoch, Table[Slot, seq[AttesterDuties]]())
let attestations = await vc.client.get_v1_validator_duties_attester(
epoch, validatorPubkeys)
for a in attestations:
if vc.attestationsForEpoch[epoch].hasKeyOrPut(a.slot, @[a]):
vc.attestationsForEpoch[epoch][a.slot].add(a)
# clear both for the current epoch and the next because a change of
# fork could invalidate the attester duties even the current epoch
vc.attestationsForEpoch.clear()
await getAttesterDutiesForEpoch(epoch)
# obtain the attestation duties this VC should do during the next epoch
await getAttesterDutiesForEpoch(epoch + 1)
# for now we will get the fork each time we update the validator duties for each epoch
# TODO should poll occasionally `/v1/config/fork_schedule`
vc.fork = await vc.client.get_v1_beacon_states_fork("head")
var numAttestationsForEpoch = 0
for _, dutiesForSlot in vc.attestationsForEpoch[epoch]:
numAttestationsForEpoch += dutiesForSlot.len
info "Got validator duties for epoch",
num_proposals = vc.proposalsForCurrentEpoch.len,
num_attestations = numAttestationsForEpoch
proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} =
let
# The slot we should be at, according to the clock
beaconTime = vc.beaconClock.now()
wallSlot = beaconTime.toSlot()
let
slot = wallSlot.slot # afterGenesis == true!
nextSlot = slot + 1
epoch = slot.compute_epoch_at_slot
info "Slot start",
lastSlot = shortLog(lastSlot),
scheduledSlot = shortLog(scheduledSlot),
beaconTime = shortLog(beaconTime),
portBN = vc.config.rpcPort
# Check before any re-scheduling of onSlotStart()
checkIfShouldStopAtEpoch(scheduledSlot, vc.config.stopAtEpoch)
try:
# at the start of each epoch - request all validator duties
# TODO perhaps call this not on the first slot of each Epoch but perhaps
# 1 slot earlier because there are a few back-and-forth requests which
# could take up time for attesting... Perhaps this should be called more
# than once per epoch because of forks & other events...
#
# calling it before epoch n starts means one can't ensure knowing about
# epoch n+1.
if slot.isEpoch:
await getValidatorDutiesForEpoch(vc, epoch)
# check if we have a validator which needs to propose on this slot
if vc.proposalsForCurrentEpoch.contains slot:
let public_key = vc.proposalsForCurrentEpoch[slot]
let notSlashable = vc.attachedValidators
.slashingProtection
.checkSlashableBlockProposal(public_key, slot)
if notSlashable.isOk:
let validator = vc.attachedValidators.validators[public_key]
notice "Proposing block", slot = slot, public_key = public_key
let randao_reveal = await validator.genRandaoReveal(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot)
var newBlock = SignedBeaconBlock(
message: await vc.client.get_v1_validator_block(slot, vc.graffitiBytes, randao_reveal)
)
newBlock.root = hash_tree_root(newBlock.message)
# TODO: signing_root is recomputed in signBlockProposal just after
let signing_root = compute_block_root(vc.fork, vc.beaconGenesis.genesis_validators_root, slot, newBlock.root)
vc.attachedValidators
.slashingProtection
.registerBlock(public_key, slot, signing_root)
newBlock.signature = await validator.signBlockProposal(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot, newBlock.root)
discard await vc.client.post_v1_validator_block(newBlock)
else:
warn "Slashing protection activated for block proposal",
validator = public_key,
slot = slot,
existingProposal = notSlashable.error
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#attesting
# A validator should create and broadcast the attestation to the associated
# attestation subnet when either (a) the validator has received a valid
# block from the expected block proposer for the assigned slot or
# (b) one-third of the slot has transpired (`SECONDS_PER_SLOT / 3` seconds
# after the start of slot) -- whichever comes first.
discard await vc.beaconClock.sleepToSlotOffset(
seconds(int64(SECONDS_PER_SLOT)) div 3, slot, "Waiting to send attestations")
# check if we have validators which need to attest on this slot
if vc.attestationsForEpoch.contains(epoch) and
vc.attestationsForEpoch[epoch].contains slot:
var validatorToAttestationDataRoot: Table[ValidatorPubKey, Eth2Digest]
for a in vc.attestationsForEpoch[epoch][slot]:
let validator = vc.attachedValidators.validators[a.public_key]
let ad = await vc.client.get_v1_validator_attestation_data(slot, a.committee_index)
let notSlashable = vc.attachedValidators
.slashingProtection
.checkSlashableAttestation(
a.public_key,
ad.source.epoch,
ad.target.epoch)
if notSlashable.isOk():
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
let signing_root = compute_attestation_root(
vc.fork, vc.beaconGenesis.genesis_validators_root, ad)
vc.attachedValidators
.slashingProtection
.registerAttestation(
a.public_key, ad.source.epoch, ad.target.epoch, signing_root)
# TODO I don't like these (u)int64-to-int conversions...
let attestation = await validator.produceAndSignAttestation(
ad, a.committee_length.int, a.validator_committee_index,
vc.fork, vc.beaconGenesis.genesis_validators_root)
notice "Attesting",
slot, public_key = a.public_key, attestation = shortLog(attestation)
discard await vc.client.post_v1_beacon_pool_attestations(attestation)
validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root
else:
warn "Slashing protection activated for attestation",
validator = a.public_key,
badVoteDetails = $notSlashable.error
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-aggregate
# If the validator is selected to aggregate (is_aggregator), then they
# broadcast their best aggregate as a SignedAggregateAndProof to the global
# aggregate channel (beacon_aggregate_and_proof) two-thirds of the way
# through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start
# of slot.
if slot > 2:
discard await vc.beaconClock.sleepToSlotOffset(
seconds(int64(SECONDS_PER_SLOT * 2)) div 3, slot,
"Waiting to aggregate attestations")
# loop again over all of our validators which need to attest on
# this slot and check if we should also aggregate attestations
for a in vc.attestationsForEpoch[epoch][slot]:
let validator = vc.attachedValidators.validators[a.public_key]
let slot_signature = await getSlotSig(validator, vc.fork,
vc.beaconGenesis.genesis_validators_root, slot)
if is_aggregator(a.committee_length, slot_signature) and
validatorToAttestationDataRoot.contains(a.public_key):
notice "Aggregating", slot = slot, public_key = a.public_key
let aa = await vc.client.get_v1_validator_aggregate_attestation(
slot, validatorToAttestationDataRoot[a.public_key])
let aap = AggregateAndProof(aggregator_index: a.validator_index.uint64,
aggregate: aa, selection_proof: slot_signature)
let sig = await signAggregateAndProof(validator,
aap, vc.fork, vc.beaconGenesis.genesis_validators_root)
var signedAP = SignedAggregateAndProof(message: aap, signature: sig)
discard await vc.client.post_v1_validator_aggregate_and_proofs(signedAP)
except CatchableError as err:
warn "Caught an unexpected error", err = err.msg, slot = shortLog(slot)
let
nextSlotStart = saturate(vc.beaconClock.fromNow(nextSlot))
info "Slot end",
slot = shortLog(slot),
nextSlot = shortLog(nextSlot),
portBN = vc.config.rpcPort
when declared(GC_fullCollect):
# The slots in the validator client work as frames in a game: we want to make
# sure that we're ready for the next one and don't get stuck in lengthy
# garbage collection tasks when time is of essence in the middle of a slot -
# while this does not guarantee that we'll never collect during a slot, it
# makes sure that all the scratch space we used during slot tasks (logging,
# temporary buffers etc) gets recycled for the next slot that is likely to
# need similar amounts of memory.
GC_fullCollect()
if (slot - 2).isEpoch and (slot.epoch + 1) in vc.attestationsForEpoch:
for slot, attesterDuties in vc.attestationsForEpoch[slot.epoch + 1].pairs:
for ad in attesterDuties:
let
validator = vc.attachedValidators.validators[ad.public_key]
sig = await validator.getSlotSig(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot)
discard await vc.client.post_v1_validator_beacon_committee_subscriptions(
ad.committee_index, ad.slot, true, ad.public_key, sig)
addTimer(nextSlotStart) do (p: pointer):
asyncCheck vc.onSlotStart(slot, nextSlot)
{.pop.} # TODO moduletests exceptions
programMain:
let config = makeBannerAndConfig("Nimbus validator client " & fullVersionStr, ValidatorClientConf)
setupStdoutLogging(config.logLevel)
setupLogging(config.logLevel, config.logFile)
# Doesn't use std/random directly, but dependencies might
let rng = keys.newRng()
if rng.isNil:
randomize()
else:
randomize(rng[].rand(high(int)))
case config.cmd
of VCNoCommand:
debug "Launching validator client",
version = fullVersionStr,
cmdParams = commandLineParams(),
config
var vc = ValidatorClient(
config: config,
client: newRpcHttpClient(),
graffitiBytes: if config.graffiti.isSome: config.graffiti.get
else: defaultGraffitiBytes())
# load all the validators from the data dir into memory
for curr in vc.config.validatorKeys:
2020-11-27 23:34:25 +00:00
vc.attachedValidators.addLocalValidator(
performance fixes (#2259) * performance fixes * don't mark tree cache as dirty on read-only List accesses * store only blob in memory for keys and signatures, parse blob lazily * compare public keys by blob instead of parsing / converting to raw * compare Eth2Digest using non-constant-time comparison * avoid some unnecessary validator copying This branch will in particular speed up deposit processing which has been slowing down block replay. Pre (mainnet, 1600 blocks): ``` All time are ms Average, StdDev, Min, Max, Samples, Test Validation is turned off meaning that no BLS operations are performed 3450.269, 0.000, 3450.269, 3450.269, 1, Initialize DB 0.417, 0.822, 0.036, 21.098, 1400, Load block from database 16.521, 0.000, 16.521, 16.521, 1, Load state from database 27.906, 50.846, 8.104, 1507.633, 1350, Apply block 52.617, 37.029, 20.640, 135.938, 50, Apply epoch block ``` Post: ``` 3502.715, 0.000, 3502.715, 3502.715, 1, Initialize DB 0.080, 0.560, 0.035, 21.015, 1400, Load block from database 17.595, 0.000, 17.595, 17.595, 1, Load state from database 15.706, 11.028, 8.300, 107.537, 1350, Apply block 33.217, 12.622, 17.331, 60.580, 50, Apply epoch block ``` * more perf fixes * load EpochRef cache into StateCache more aggressively * point out security concern with public key cache * reuse proposer index from state when processing block * avoid genericAssign in a few more places * don't parse key when signature is unparseable * fix `==` overload for Eth2Digest * preallocate validator list when getting active validators * speed up proposer index calculation a little bit * reuse cache when replaying blocks in ncli_db * avoid a few more copying loops ``` Average, StdDev, Min, Max, Samples, Test Validation is turned off meaning that no BLS operations are performed 3279.158, 0.000, 3279.158, 3279.158, 1, Initialize DB 0.072, 0.357, 0.035, 13.400, 1400, Load block from database 17.295, 0.000, 17.295, 17.295, 1, Load state from database 5.918, 9.896, 0.198, 98.028, 1350, Apply block 15.888, 10.951, 7.902, 39.535, 50, Apply epoch block 0.000, 0.000, 0.000, 0.000, 0, Database block store ``` * clear full balance cache before processing rewards and penalties ``` All time are ms Average, StdDev, Min, Max, Samples, Test Validation is turned off meaning that no BLS operations are performed 3947.901, 0.000, 3947.901, 3947.901, 1, Initialize DB 0.124, 0.506, 0.026, 202.370, 363345, Load block from database 97.614, 0.000, 97.614, 97.614, 1, Load state from database 0.186, 0.188, 0.012, 99.561, 357262, Advance slot, non-epoch 14.161, 5.966, 1.099, 395.511, 11524, Advance slot, epoch 1.372, 4.170, 0.017, 276.401, 363345, Apply block, no slot processing 0.000, 0.000, 0.000, 0.000, 0, Database block store ```
2021-01-25 12:04:18 +00:00
curr.toPubKey, curr, none(ValidatorIndex))
waitFor vc.client.connect($vc.config.rpcAddress, vc.config.rpcPort)
info "Connected to BN",
port = vc.config.rpcPort,
address = vc.config.rpcAddress
vc.attemptUntilSuccess:
# init the beacon clock
vc.beaconGenesis = waitFor vc.client.get_v1_beacon_genesis()
vc.beaconClock = BeaconClock.init(vc.beaconGenesis.genesis_time)
vc.attachedValidators.slashingProtection =
SlashingProtectionDB.init(
vc.beaconGenesis.genesis_validators_root,
config.validatorsDir(), "slashing_protection"
)
let
curSlot = vc.beaconClock.now().slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
fromNow = saturate(vc.beaconClock.fromNow(nextSlot))
vc.attemptUntilSuccess:
waitFor vc.getValidatorDutiesForEpoch(curSlot.compute_epoch_at_slot)
info "Scheduling first slot action",
beaconTime = shortLog(vc.beaconClock.now()),
nextSlot = shortLog(nextSlot),
fromNow = shortLog(fromNow)
addTimer(fromNow) do (p: pointer) {.gcsafe.}:
asyncCheck vc.onSlotStart(curSlot, nextSlot)
runForever()