New validator client using REST API. (#2651)

* Initial commit.

* Exporting getConfig().

* Add beacon node checking procedures.

* Post rebase fixes.

* Use runSlotLoop() from nimbus_beacon_node.
Fallback implementation.
Fixes for ETH2 REST serialization.

* Add beacon_clock.durationToNextSlot().
Move type declarations from beacon_rest_api to json_rest_serialization.
Fix seq[ValidatorIndex] serialization.
Refactor ValidatorPool and add some utility procedures.
Create separate version of validator_client.

* Post-rebase fixes.
Remove CookedPubKey from validator_pool.nim.

* Now we should be able to produce attestations and aggregate and proofs.
But its not working yet.

* Debugging attestation sending.

* Add durationToNextAttestation.
Optimize some debug logs.
Fix aggregation_bits encoding.
Bump chronos/presto.

* Its alive.

* Fixes for launch_local_testnet script.
Bump chronos.

* Switch client API to not use `/api` prefix.

* Post-rebase adjustments.

* Fix endpoint for publishBlock().

* Add CONFIG_NAME.
Add more checks to ensure that beacon_node is compatible.

* Add beacon committee subscription support to validator_client.

* Fix stacktrace should be an array of strings.
Fix committee subscriptions should not be `data` keyed.

* Log duration to next block proposal.

* Fix beacon_node_status import.

* Use jsonMsgResponse() instead of jsonError().

* Fix graffityBytes usage.
Remove unnecessary `await`.
Adjust creation of SignedBlock instance.
Remove legacy files.

* Rework durationToNextSlot() and durationToNextEpoch() to use `fromNow`.

* Fix race condition for block proposal and attestations for same slot.
Fix local_testnet script to properly kill tasks on Windows.
Bump chronos and nim-http-tools, to allow connections to infura.io (basic auth).

* Catch services errors.
Improve performance of local_testnet.sh script on Windows.
Fix race condition when attestation producing.

* Post-rebase fixes.

* Bump chronos and presto.

* Calculate block publishing delay.
Fix pkill in one more place.

* Add error handling and timeouts to firstSuccess() template.
Add onceToAll() template.
Add checkNodes() procedure.
Refactor firstSuccess() template.
Add error checking to api.nim calls.

* Deprecated usage onceToAll() for better stability.
Address comment and send attestations asap.

* Avoid unnecessary loop when calculating minimal duration.
This commit is contained in:
Eugene Kabanov 2021-07-13 14:15:07 +03:00 committed by GitHub
parent 81d54a3c89
commit 3b6f4fab4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 2828 additions and 561 deletions

View File

@ -9,7 +9,7 @@
import
chronos, chronicles,
./spec/datatypes
./spec/[datatypes, helpers]
from times import Time, getTime, fromUnix, `<`, `-`, inNanoseconds
@ -102,6 +102,20 @@ proc fromNow*(c: BeaconClock, t: BeaconTime): tuple[inFuture: bool, offset: Dura
proc fromNow*(c: BeaconClock, slot: Slot): tuple[inFuture: bool, offset: Duration] =
c.fromNow(slot.toBeaconTime())
proc durationToNextSlot*(c: BeaconClock): Duration =
let (afterGenesis, slot) = c.now().toSlot()
if afterGenesis:
c.fromNow(Slot(slot) + 1'u64).offset
else:
c.fromNow(Slot(0)).offset
proc durationToNextEpoch*(c: BeaconClock): Duration =
let (afterGenesis, slot) = c.now().toSlot()
if afterGenesis:
c.fromNow(compute_start_slot_at_epoch(slot.epoch() + 1'u64)).offset
else:
c.fromNow(compute_start_slot_at_epoch(Epoch(0))).offset
func saturate*(d: tuple[inFuture: bool, offset: Duration]): Duration =
if d.inFuture: d.offset else: seconds(0)

View File

@ -116,7 +116,7 @@ type
pubKeyStr*: string
AttachedValidator* = ref object
pubKey*: CookedPubKey
pubKey*: ValidatorPubKey
case kind*: ValidatorKind
of inProcess:

View File

@ -548,7 +548,8 @@ type
of VCNoCommand:
graffiti* {.
desc: "The graffiti value that will appear in proposed blocks. " &
"You can use a 0x-prefixed hex encoded string to specify raw bytes"
"You can use a 0x-prefixed hex encoded string to specify " &
"raw bytes"
name: "graffiti" }: Option[GraffitiBytes]
stopAtEpoch* {.
@ -556,22 +557,9 @@ type
defaultValue: 0
name: "stop-at-epoch" }: uint64
rpcPort* {.
desc: "HTTP port of the server to connect to for RPC"
defaultValue: defaultEth2RpcPort
defaultValueDesc: "9190"
name: "rpc-port" }: Port
rpcAddress* {.
desc: "Address of the server to connect to for RPC"
defaultValue: init(ValidIpAddress, "127.0.0.1")
defaultValueDesc: "127.0.0.1"
name: "rpc-address" }: ValidIpAddress
retryDelay* {.
desc: "Delay in seconds between retries after unsuccessful attempts to connect to a beacon node [=10]"
defaultValue: 10
name: "retry-delay" }: int
beaconNodes* {.
desc: "URL addresses to one or more beacon node HTTP REST APIs",
name: "beacon-node" }: seq[string]
proc defaultDataDir*(config: BeaconNodeConf|ValidatorClientConf): string =
let dataDir = when defined(windows):

View File

@ -1013,76 +1013,6 @@ proc onSlotStart(
await onSlotEnd(node, wallSlot)
proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} =
var
curSlot = startTime.slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
timeToNextSlot = nextSlot.toBeaconTime() - startTime
info "Scheduling first slot action",
startTime = shortLog(startTime),
nextSlot = shortLog(nextSlot),
timeToNextSlot = shortLog(timeToNextSlot)
while true:
# Start by waiting for the time when the slot starts. Sleeping relinquishes
# control to other tasks which may or may not finish within the alotted
# time, so below, we need to be wary that the ship might have sailed
# already.
await sleepAsync(timeToNextSlot)
let
wallTime = node.dag.beaconClock.now()
wallSlot = wallTime.slotOrZero() # Always > GENESIS!
if wallSlot < nextSlot:
# While we were sleeping, the system clock changed and time moved
# backwards!
if wallSlot + 1 < nextSlot:
# This is a critical condition where it's hard to reason about what
# to do next - we'll call the attention of the user here by shutting
# down.
fatal "System time adjusted backwards significantly - clock may be inaccurate - shutting down",
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
bnStatus = BeaconNodeStatus.Stopping
return
# Time moved back by a single slot - this could be a minor adjustment,
# for example when NTP does its thing after not working for a while
warn "System time adjusted backwards, rescheduling slot actions",
wallTime = shortLog(wallTime),
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
# cur & next slot remain the same
timeToNextSlot = nextSlot.toBeaconTime() - wallTime
continue
if wallSlot > nextSlot + SLOTS_PER_EPOCH:
# Time moved forwards by more than an epoch - either the clock was reset
# or we've been stuck in processing for a long time - either way, we will
# skip ahead so that we only process the events of the last
# SLOTS_PER_EPOCH slots
warn "Time moved forwards by more than an epoch, skipping ahead",
curSlot = shortLog(curSlot),
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
curSlot = wallSlot - SLOTS_PER_EPOCH
elif wallSlot > nextSlot:
notice "Missed expected slot start, catching up",
delay = shortLog(wallTime - nextSlot.toBeaconTime()),
curSlot = shortLog(curSlot),
nextSlot = shortLog(curSlot)
await onSlotStart(node, wallTime, curSlot)
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = saturate(node.dag.beaconClock.fromNow(nextSlot))
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine.checkMissing()
if missingBlocks.len > 0:
@ -1244,7 +1174,7 @@ proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.installMessageValidators()
let startTime = node.dag.beaconClock.now()
asyncSpawn runSlotLoop(node, startTime)
asyncSpawn runSlotLoop(node, startTime, onSlotStart)
asyncSpawn runOnSecondLoop(node)
asyncSpawn runQueueProcessingLoop(node.blockProcessor)

View File

@ -20,11 +20,16 @@ import
# Local modules
./spec/[crypto, helpers], ./spec/datatypes/base, beacon_clock, filepath,
./networking/eth2_network
./beacon_node_status, ./networking/eth2_network
when defined(posix):
import termios
type
SlotStartProc*[T] = proc(node: T, wallTime: BeaconTime,
lastSlot: Slot): Future[void] {.gcsafe,
raises: [Defect].}
proc setupStdoutLogging*(logLevel: string) =
when compiles(defaultChroniclesStream.output.writer):
defaultChroniclesStream.outputs[0].writer =
@ -128,3 +133,73 @@ proc resetStdin*() =
attrs.c_lflag = attrs.c_lflag or Cflag(ECHO)
discard fd.tcSetAttr(TCSANOW, attrs.addr)
proc runSlotLoop*[T](node: T, startTime: BeaconTime,
slotProc: SlotStartProc[T]) {.async.} =
var
curSlot = startTime.slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
timeToNextSlot = nextSlot.toBeaconTime() - startTime
info "Scheduling first slot action",
startTime = shortLog(startTime),
nextSlot = shortLog(nextSlot),
timeToNextSlot = shortLog(timeToNextSlot)
while true:
# Start by waiting for the time when the slot starts. Sleeping relinquishes
# control to other tasks which may or may not finish within the alotted
# time, so below, we need to be wary that the ship might have sailed
# already.
await sleepAsync(timeToNextSlot)
let
wallTime = node.beaconClock.now()
wallSlot = wallTime.slotOrZero() # Always > GENESIS!
if wallSlot < nextSlot:
# While we were sleeping, the system clock changed and time moved
# backwards!
if wallSlot + 1 < nextSlot:
# This is a critical condition where it's hard to reason about what
# to do next - we'll call the attention of the user here by shutting
# down.
fatal "System time adjusted backwards significantly - clock may be inaccurate - shutting down",
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
bnStatus = BeaconNodeStatus.Stopping
return
# Time moved back by a single slot - this could be a minor adjustment,
# for example when NTP does its thing after not working for a while
warn "System time adjusted backwards, rescheduling slot actions",
wallTime = shortLog(wallTime),
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
# cur & next slot remain the same
timeToNextSlot = nextSlot.toBeaconTime() - wallTime
continue
if wallSlot > nextSlot + SLOTS_PER_EPOCH:
# Time moved forwards by more than an epoch - either the clock was reset
# or we've been stuck in processing for a long time - either way, we will
# skip ahead so that we only process the events of the last
# SLOTS_PER_EPOCH slots
warn "Time moved forwards by more than an epoch, skipping ahead",
curSlot = shortLog(curSlot),
nextSlot = shortLog(nextSlot),
wallSlot = shortLog(wallSlot)
curSlot = wallSlot - SLOTS_PER_EPOCH
elif wallSlot > nextSlot:
notice "Missed expected slot start, catching up",
delay = shortLog(wallTime - nextSlot.toBeaconTime()),
curSlot = shortLog(curSlot),
nextSlot = shortLog(curSlot)
await slotProc(node, wallTime, curSlot)
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = saturate(node.beaconClock.fromNow(nextSlot))

View File

@ -4,339 +4,199 @@
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import validator_client/[common, fallback_service, duties_service,
attestation_service, fork_service]
{.push raises: [Defect].}
import
# Standard library
std/[os, random, strutils],
# Nimble packages
stew/shims/[tables, macros],
chronos, confutils, metrics,
chronicles,
json_serialization/std/[options, net],
# Local modules
./spec/datatypes/[phase0, altair],
./spec/[digest, crypto, helpers, network, signatures],
./spec/eth2_apis/beacon_rpc_client,
./sync/sync_manager,
"."/[conf, beacon_clock, version],
./networking/[eth2_network, eth2_discovery],
./beacon_node_types,
./nimbus_binary_common,
./ssz/merkleization,
./spec/eth2_apis/callsigs_types,
./validators/[attestation_aggregation, keystore_management, validator_pool, slashing_protection],
./eth/db/[kvstore, kvstore_sqlite3],
./eth/keys, ./eth/p2p/discoveryv5/random2
logScope: topics = "vc"
type
ValidatorClient = ref object
config: ValidatorClientConf
graffitiBytes: GraffitiBytes
client: RpcHttpClient
beaconClock: BeaconClock
attachedValidators: ValidatorPool
fork: Fork
proposalsForCurrentEpoch: Table[Slot, ValidatorPubKey]
attestationsForEpoch: Table[Epoch, Table[Slot, seq[AttesterDuties]]]
beaconGenesis: BeaconGenesisTuple
template attemptUntilSuccess(vc: ValidatorClient, body: untyped) =
proc initGenesis*(vc: ValidatorClientRef): Future[RestBeaconGenesis] {.async.} =
info "Initializing genesis", nodes_count = len(vc.beaconNodes)
var nodes = vc.beaconNodes
while true:
var pending: seq[Future[RestResponse[DataRestBeaconGenesis]]]
for node in nodes:
debug "Requesting genesis information", endpoint = node
pending.add(node.client.getBeaconGenesis())
try:
body
break
except CatchableError as err:
warn "Caught an unexpected error", err = err.msg
waitFor sleepAsync(chronos.seconds(vc.config.retryDelay))
await allFutures(pending)
except CancelledError as exc:
warn "Unexpected cancellation interrupt"
raise exc
proc getValidatorDutiesForEpoch(vc: ValidatorClient, epoch: Epoch) {.gcsafe, async.} =
info "Getting validator duties for epoch", epoch = epoch
let (errorNodes, genesisList) =
block:
var gres: seq[RestBeaconGenesis]
var bres: seq[BeaconNodeServerRef]
for i in 0 ..< len(pending):
let fut = pending[i]
if fut.done():
let resp = fut.read()
if resp.status == 200:
debug "Received genesis information", endpoint = nodes[i],
genesis_time = resp.data.data.genesis_time,
genesis_fork_version = resp.data.data.genesis_fork_version,
genesis_root = resp.data.data.genesis_validators_root
gres.add(resp.data.data)
else:
debug "Received unsuccessfull response code", endpoint = nodes[i],
response_code = resp.status
bres.add(nodes[i])
elif fut.failed():
let error = fut.readError()
debug "Could not obtain genesis information from beacon node",
endpoint = nodes[i], error_name = error.name,
error_msg = error.msg
bres.add(nodes[i])
else:
debug "Interrupted while requesting information from beacon node",
endpoint = nodes[i]
bres.add(nodes[i])
(bres, gres)
let proposals = await vc.client.get_v1_validator_duties_proposer(epoch)
# update the block proposal duties this VC should do during this epoch
vc.proposalsForCurrentEpoch.clear()
for curr in proposals:
if vc.attachedValidators.validators.contains curr.public_key:
vc.proposalsForCurrentEpoch.add(curr.slot, curr.public_key)
if len(genesisList) == 0:
let sleepDuration = 2.seconds
info "Could not obtain network genesis information from nodes, repeating",
sleep_time = sleepDuration
await sleepAsync(sleepDuration)
nodes = errorNodes
else:
# Boyer-Moore majority vote algorithm
var melem: RestBeaconGenesis
var counter = 0
for item in genesisList:
if counter == 0:
melem = item
inc(counter)
else:
if melem == item:
inc(counter)
else:
dec(counter)
return melem
# couldn't use mapIt in ANY shape or form so reverting to raw loops - sorry Sean Parent :|
var validatorPubkeys: seq[ValidatorPubKey]
for key in vc.attachedValidators.validators.keys:
validatorPubkeys.add key
proc initValidators*(vc: ValidatorClientRef): Future[bool] {.async.} =
info "Initializaing validators", path = vc.config.validatorsDir()
var duplicates: seq[ValidatorPubKey]
for key in vc.config.validatorKeys():
let pubkey = key.toPubKey().toPubKey()
if pubkey in duplicates:
error "Duplicate validator's key found", validator_pubkey = pubkey
return false
else:
duplicates.add(pubkey)
vc.attachedValidators.addLocalValidator(key)
return true
proc getAttesterDutiesForEpoch(epoch: Epoch) {.gcsafe, async.} =
# make sure there's an entry
if not vc.attestationsForEpoch.contains epoch:
vc.attestationsForEpoch.add(epoch, Table[Slot, seq[AttesterDuties]]())
let attestations = await vc.client.get_v1_validator_duties_attester(
epoch, validatorPubkeys)
for a in attestations:
if vc.attestationsForEpoch[epoch].hasKeyOrPut(a.slot, @[a]):
vc.attestationsForEpoch[epoch][a.slot].add(a)
proc initClock*(vc: ValidatorClientRef): Future[BeaconClock] {.async.} =
# This procedure performs initialization of BeaconClock using current genesis
# information. It also performs waiting for genesis.
let res = BeaconClock.init(vc.beaconGenesis.genesis_time)
let currentSlot = res.now().slotOrZero()
let currentEpoch = currentSlot.epoch()
info "Initializing beacon clock",
genesis_time = vc.beaconGenesis.genesis_time,
current_slot = currentSlot, current_epoch = currentEpoch
let genesisTime = res.fromNow(toBeaconTime(Slot(0)))
if genesisTime.inFuture:
notice "Waiting for genesis", genesisIn = genesisTime.offset
await sleepAsync(genesisTime.offset)
return res
# clear both for the current epoch and the next because a change of
# fork could invalidate the attester duties even the current epoch
vc.attestationsForEpoch.clear()
await getAttesterDutiesForEpoch(epoch)
# obtain the attestation duties this VC should do during the next epoch
await getAttesterDutiesForEpoch(epoch + 1)
proc asyncInit*(vc: ValidatorClientRef) {.async.} =
vc.beaconGenesis = await vc.initGenesis()
info "Genesis information", genesis_time = vc.beaconGenesis.genesis_time,
genesis_fork_version = vc.beaconGenesis.genesis_fork_version,
genesis_root = vc.beaconGenesis.genesis_validators_root
# for now we will get the fork each time we update the validator duties for each epoch
# TODO should poll occasionally `/v1/config/fork_schedule`
vc.fork = await vc.client.get_v1_beacon_states_fork("head")
vc.beaconClock = await vc.initClock()
var numAttestationsForEpoch = 0
for _, dutiesForSlot in vc.attestationsForEpoch[epoch]:
numAttestationsForEpoch += dutiesForSlot.len
if not(await initValidators(vc)):
fatal "Could not initialize local validators"
info "Got validator duties for epoch",
num_proposals = vc.proposalsForCurrentEpoch.len,
num_attestations = numAttestationsForEpoch
info "Initializing slashing protection", path = vc.config.validatorsDir()
vc.attachedValidators.slashingProtection =
SlashingProtectionDB.init(
vc.beaconGenesis.genesis_validators_root,
vc.config.validatorsDir(), "slashing_protection"
)
proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} =
vc.fallbackService = await FallbackServiceRef.init(vc)
vc.forkService = await ForkServiceRef.init(vc)
vc.dutiesService = await DutiesServiceRef.init(vc)
vc.attestationService = await AttestationServiceRef.init(vc)
proc onSlotStart(vc: ValidatorClientRef, wallTime: BeaconTime,
lastSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late.
## wallTime: current system time - we will strive to perform all duties up
## to this point in time
## lastSlot: the last slot that we successfully processed, so we know where to
## start work from - there might be jumps if processing is delayed
let
# The slot we should be at, according to the clock
beaconTime = vc.beaconClock.now()
wallSlot = beaconTime.toSlot()
beaconTime = wallTime
wallSlot = wallTime.toSlot()
let
slot = wallSlot.slot # afterGenesis == true!
nextSlot = slot + 1
epoch = slot.compute_epoch_at_slot
# If everything was working perfectly, the slot that we should be processing
expectedSlot = lastSlot + 1
delay = wallTime - expectedSlot.toBeaconTime()
checkIfShouldStopAtEpoch(wallSlot.slot, vc.config.stopAtEpoch)
info "Slot start",
lastSlot = shortLog(lastSlot),
scheduledSlot = shortLog(scheduledSlot),
beaconTime = shortLog(beaconTime),
portBN = vc.config.rpcPort
wallSlot = shortLog(wallSlot.slot),
delay = shortLog(delay),
attestationIn = vc.getDurationToNextAttestation(wallSlot.slot),
blockIn = vc.getDurationToNextBlock(wallSlot.slot)
# Check before any re-scheduling of onSlotStart()
checkIfShouldStopAtEpoch(scheduledSlot, vc.config.stopAtEpoch)
proc asyncRun*(vc: ValidatorClientRef) {.async.} =
vc.fallbackService.start()
vc.forkService.start()
vc.dutiesService.start()
vc.attestationService.start()
try:
# at the start of each epoch - request all validator duties
# TODO perhaps call this not on the first slot of each Epoch but perhaps
# 1 slot earlier because there are a few back-and-forth requests which
# could take up time for attesting... Perhaps this should be called more
# than once per epoch because of forks & other events...
#
# calling it before epoch n starts means one can't ensure knowing about
# epoch n+1.
if slot.isEpoch:
await getValidatorDutiesForEpoch(vc, epoch)
# check if we have a validator which needs to propose on this slot
if vc.proposalsForCurrentEpoch.contains slot:
let public_key = vc.proposalsForCurrentEpoch[slot]
notice "Proposing block", slot = slot, public_key = public_key
let validator = vc.attachedValidators.validators[public_key]
let randao_reveal = await validator.genRandaoReveal(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot)
var newBlock = phase0.SignedBeaconBlock(
message: await vc.client.get_v1_validator_block(slot, vc.graffitiBytes, randao_reveal)
)
newBlock.root = hash_tree_root(newBlock.message)
# TODO: signing_root is recomputed in signBlockProposal just after
let signing_root = compute_block_root(vc.fork, vc.beaconGenesis.genesis_validators_root, slot, newBlock.root)
let notSlashable = vc.attachedValidators
.slashingProtection
.registerBlock(
newBlock.message.proposer_index.ValidatorIndex, public_key, slot,
signing_root)
if notSlashable.isOk:
newBlock.signature = await validator.signBlockProposal(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot, newBlock.root)
discard await vc.client.post_v1_validator_block(newBlock)
else:
warn "Slashing protection activated for block proposal",
validator = public_key,
slot = slot,
existingProposal = notSlashable.error
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#attesting
# A validator should create and broadcast the attestation to the associated
# attestation subnet when either (a) the validator has received a valid
# block from the expected block proposer for the assigned slot or
# (b) one-third of the slot has transpired (`SECONDS_PER_SLOT / 3` seconds
# after the start of slot) -- whichever comes first.
discard await vc.beaconClock.sleepToSlotOffset(
seconds(int64(SECONDS_PER_SLOT)) div 3, slot, "Waiting to send attestations")
# check if we have validators which need to attest on this slot
if vc.attestationsForEpoch.contains(epoch) and
vc.attestationsForEpoch[epoch].contains slot:
var validatorToAttestationDataRoot: Table[ValidatorPubKey, Eth2Digest]
for a in vc.attestationsForEpoch[epoch][slot]:
let validator = vc.attachedValidators.validators[a.public_key]
let ad = await vc.client.get_v1_validator_attestation_data(slot, a.committee_index)
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
let signing_root = compute_attestation_root(
vc.fork, vc.beaconGenesis.genesis_validators_root, ad)
let notSlashable = vc.attachedValidators
.slashingProtection
.registerAttestation(
a.validator_index, a.public_key, ad.source.epoch, ad.target.epoch, signing_root)
if notSlashable.isOk():
# TODO I don't like these (u)int64-to-int conversions...
let attestation = await validator.produceAndSignAttestation(
ad, a.committee_length.int, a.validator_committee_index,
vc.fork, vc.beaconGenesis.genesis_validators_root)
notice "Sending attestation to beacon node",
public_key = a.public_key, attestation = shortLog(attestation)
let ok = await vc.client.post_v1_beacon_pool_attestations(attestation)
if not ok:
warn "Failed to send attestation to beacon node",
public_key = a.public_key, attestation = shortLog(attestation)
validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root
else:
warn "Slashing protection activated for attestation",
validator = a.public_key,
badVoteDetails = $notSlashable.error
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-aggregate
# If the validator is selected to aggregate (is_aggregator), then they
# broadcast their best aggregate as a SignedAggregateAndProof to the global
# aggregate channel (beacon_aggregate_and_proof) two-thirds of the way
# through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start
# of slot.
if slot > 2:
discard await vc.beaconClock.sleepToSlotOffset(
seconds(int64(SECONDS_PER_SLOT * 2)) div 3, slot,
"Waiting to aggregate attestations")
# loop again over all of our validators which need to attest on
# this slot and check if we should also aggregate attestations
for a in vc.attestationsForEpoch[epoch][slot]:
let validator = vc.attachedValidators.validators[a.public_key]
let slot_signature = await getSlotSig(validator, vc.fork,
vc.beaconGenesis.genesis_validators_root, slot)
if is_aggregator(a.committee_length, slot_signature) and
validatorToAttestationDataRoot.contains(a.public_key):
notice "Aggregating", slot = slot, public_key = a.public_key
let aa = await vc.client.get_v1_validator_aggregate_attestation(
slot, validatorToAttestationDataRoot[a.public_key])
let aap = AggregateAndProof(aggregator_index: a.validator_index.uint64,
aggregate: aa, selection_proof: slot_signature)
let sig = await signAggregateAndProof(validator,
aap, vc.fork, vc.beaconGenesis.genesis_validators_root)
var signedAP = SignedAggregateAndProof(message: aap, signature: sig)
discard await vc.client.post_v1_validator_aggregate_and_proofs(signedAP)
except CatchableError as err:
warn "Caught an unexpected error", err = err.msg, slot = shortLog(slot)
let
nextSlotStart = saturate(vc.beaconClock.fromNow(nextSlot))
info "Slot end",
slot = shortLog(slot),
nextSlot = shortLog(nextSlot),
portBN = vc.config.rpcPort
when declared(GC_fullCollect):
# The slots in the validator client work as frames in a game: we want to make
# sure that we're ready for the next one and don't get stuck in lengthy
# garbage collection tasks when time is of essence in the middle of a slot -
# while this does not guarantee that we'll never collect during a slot, it
# makes sure that all the scratch space we used during slot tasks (logging,
# temporary buffers etc) gets recycled for the next slot that is likely to
# need similar amounts of memory.
GC_fullCollect()
if (slot - 2).isEpoch and (slot.epoch + 1) in vc.attestationsForEpoch:
for slot, attesterDuties in vc.attestationsForEpoch[slot.epoch + 1].pairs:
for ad in attesterDuties:
let
validator = vc.attachedValidators.validators[ad.public_key]
sig = await validator.getSlotSig(
vc.fork, vc.beaconGenesis.genesis_validators_root, slot)
discard await vc.client.post_v1_validator_beacon_committee_subscriptions(
ad.committee_index, ad.slot, true, ad.public_key, sig)
addTimer(nextSlotStart) do (p: pointer):
asyncCheck vc.onSlotStart(slot, nextSlot)
{.pop.} # TODO moduletests exceptions
await runSlotLoop(vc, vc.beaconClock.now(), onSlotStart)
programMain:
let config = makeBannerAndConfig("Nimbus validator client " & fullVersionStr, ValidatorClientConf)
let config = makeBannerAndConfig("Nimbus validator client " & fullVersionStr,
ValidatorClientConf)
setupStdoutLogging(config.logLevel)
setupLogging(config.logLevel, config.logFile)
# Doesn't use std/random directly, but dependencies might
let rng = keys.newRng()
if rng.isNil:
randomize()
else:
randomize(rng[].rand(high(int)))
case config.cmd
of VCNoCommand:
debug "Launching validator client",
version = fullVersionStr,
cmdParams = commandLineParams(),
config
of VCNoCommand:
let beaconNodes =
block:
var servers: seq[BeaconNodeServerRef]
let flags = {RestClientFlag.CommaSeparatedArray}
for url in config.beaconNodes:
let res = RestClientRef.new(url, flags = flags)
if res.isErr():
warn "Unable to resolve remote beacon node server's hostname",
url = url
else:
servers.add(BeaconNodeServerRef(client: res.get(), endpoint: url))
servers
var vc = ValidatorClient(
config: config,
client: newRpcHttpClient(),
graffitiBytes: if config.graffiti.isSome: config.graffiti.get
else: defaultGraffitiBytes())
if len(beaconNodes) == 0:
fatal "Not enough beacon nodes in command line"
quit 1
# load all the validators from the data dir into memory
for curr in vc.config.validatorKeys:
vc.attachedValidators.addLocalValidator(
curr.toPubKey, curr, none(ValidatorIndex))
debug "Launching validator client", version = fullVersionStr,
cmdParams = commandLineParams(),
config,
beacon_nodes_count = len(beaconNodes)
waitFor vc.client.connect($vc.config.rpcAddress, vc.config.rpcPort)
info "Connected to BN",
port = vc.config.rpcPort,
address = vc.config.rpcAddress
vc.attemptUntilSuccess:
# init the beacon clock
vc.beaconGenesis = waitFor vc.client.get_v1_beacon_genesis()
vc.beaconClock = BeaconClock.init(vc.beaconGenesis.genesis_time)
vc.attachedValidators.slashingProtection =
SlashingProtectionDB.init(
vc.beaconGenesis.genesis_validators_root,
config.validatorsDir(), "slashing_protection"
var vc = ValidatorClientRef(
config: config,
beaconNodes: beaconNodes,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
nodesAvailable: newAsyncEvent(),
)
let
curSlot = vc.beaconClock.now().slotOrZero()
nextSlot = curSlot + 1 # No earlier than GENESIS_SLOT + 1
fromNow = saturate(vc.beaconClock.fromNow(nextSlot))
vc.attemptUntilSuccess:
waitFor vc.getValidatorDutiesForEpoch(curSlot.compute_epoch_at_slot)
info "Scheduling first slot action",
beaconTime = shortLog(vc.beaconClock.now()),
nextSlot = shortLog(nextSlot),
fromNow = shortLog(fromNow)
addTimer(fromNow) do (p: pointer) {.gcsafe.}:
asyncCheck vc.onSlotStart(curSlot, nextSlot)
runForever()
waitFor asyncInit(vc)
waitFor asyncRun(vc)

View File

@ -22,27 +22,7 @@ logScope: topics = "rest_beaconapi"
const
# https://github.com/ethereum/eth2.0-APIs/blob/master/apis/beacon/states/validator_balances.yaml#L17
# https://github.com/ethereum/eth2.0-APIs/blob/master/apis/beacon/states/validators.yaml#L17
MaximumValidatorIds = 30
type
RestValidator* = object
index*: ValidatorIndex
balance*: string
status*: string
validator*: Validator
RestValidatorBalance* = object
index*: ValidatorIndex
balance*: string
RestBeaconStatesCommittees* = object
index*: CommitteeIndex
slot*: Slot
validators*: seq[ValidatorIndex]
RestAttestationsFailure* = object
index*: uint64
message*: string
MaximumValidatorIds* = 30
proc validateFilter(filters: seq[ValidatorFilter]): Result[ValidatorFilter,
cstring] =
@ -638,7 +618,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
node.network.broadcast(blocksTopic, blck)
return RestApiResponse.jsonError(Http202, BlockValidationError)
else:
return RestApiResponse.jsonError(Http200, BlockValidationSuccess)
return RestApiResponse.jsonMsgResponse(BlockValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getBlock
router.api(MethodGet, "/api/eth/v1/beacon/blocks/{block_id}") do (
@ -729,6 +709,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
var failures: seq[RestAttestationsFailure]
for atindex, attestation in attestations.pairs():
debug "Attestation for pool", attestation = attestation, signature = $attestation.signature
if not await node.sendAttestation(attestation):
failures.add(RestAttestationsFailure(
index: uint64(atindex), message: "Attestation failed validation"))
@ -737,7 +718,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonErrorList(Http400, AttestationValidationError,
failures)
else:
return RestApiResponse.jsonError(Http200, AttestationValidationSuccess)
return RestApiResponse.jsonMsgResponse(AttestationValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getPoolAttesterSlashings
router.api(MethodGet, "/api/eth/v1/beacon/pool/attester_slashings") do (
@ -771,7 +752,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
$vres.error())
res
node.sendAttesterSlashing(slashing)
return RestApiResponse.jsonError(Http200, AttesterSlashingValidationSuccess)
return RestApiResponse.jsonMsgResponse(AttesterSlashingValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getPoolProposerSlashings
router.api(MethodGet, "/api/eth/v1/beacon/pool/proposer_slashings") do (
@ -805,7 +786,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
$vres.error())
res
node.sendProposerSlashing(slashing)
return RestApiResponse.jsonError(Http200, ProposerSlashingValidationSuccess)
return RestApiResponse.jsonMsgResponse(ProposerSlashingValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getPoolVoluntaryExits
router.api(MethodGet, "/api/eth/v1/beacon/pool/voluntary_exits") do (
@ -839,7 +820,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
$vres.error())
res
node.sendVoluntaryExit(exit)
return RestApiResponse.jsonError(Http200, VoluntaryExitValidationSuccess)
return RestApiResponse.jsonMsgResponse(VoluntaryExitValidationSuccess)
router.redirect(
MethodGet,
@ -951,3 +932,38 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/beacon/pool/voluntary_exits",
"/api/eth/v1/beacon/pool/voluntary_exits"
)
proc getBeaconGenesis*(): RestResponse[DataRestBeaconGenesis] {.
rest, endpoint: "/eth/v1/beacon/genesis",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/getGenesis
proc getStateFork*(state_id: StateIdent): RestResponse[DataRestFork] {.
rest, endpoint: "/eth/v1/beacon/states/{state_id}/fork",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/getStateFork
proc publishBlock*(body: SignedBeaconBlock): RestPlainResponse {.
rest, endpoint: "/eth/v1/beacon/blocks",
meth: MethodPost.}
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/publishBlock
proc getStateValidator*(state_id: StateIdent,
validator_id: ValidatorIdent
): RestResponse[DataRestValidator] {.
rest,
endpoint: "/eth/v1/beacon/states/{state_id}/validators/{validator_id}",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/getStateValidator
proc getStateValidators*(state_id: StateIdent,
id: seq[ValidatorIdent]
): RestResponse[DataRestValidatorList] {.
rest, endpoint: "/eth/v1/beacon/states/{state_id}/validators",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/getStateValidators
proc submitPoolAttestations*(body: seq[Attestation]): RestPlainResponse {.
rest, endpoint: "/eth/v1/beacon/pool/attestations",
meth: MethodPost.}
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/submitPoolAttestations

View File

@ -18,7 +18,10 @@ import
logScope: topics = "rest_config"
func getDepositAddress(node: BeaconNode): string =
$node.runtimePreset.DEPOSIT_CONTRACT_ADDRESS
if isNil(node.eth1Monitor):
"0x0000000000000000000000000000000000000000"
else:
$node.eth1Monitor.depositContractAddress
proc installConfigApiHandlers*(router: var RestRouter, node: BeaconNode) =
router.api(MethodGet,
@ -180,3 +183,7 @@ proc installConfigApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/config/deposit_contract",
"/api/eth/v1/config/deposit_contract"
)
proc getConfig*(): RestResponse[DataRestConfig] {.
rest, endpoint: "/eth/v1/config/spec", meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Config/getSpec

View File

@ -1,19 +1,176 @@
import
std/[typetraits],
stew/[results, base10, byteutils],
stew/[results, base10, byteutils, endians2],
chronicles, presto,
faststreams/[outputs],
serialization, json_serialization,
nimcrypto/utils as ncrutils,
../beacon_node_common, ../networking/eth2_network,
../consensus_object_pools/[blockchain_dag, exit_pool],
../spec/[crypto, digest, datatypes],
../spec/[crypto, digest, datatypes, eth2_apis/callsigs_types],
../ssz/merkleization,
rest_utils
export json_serialization
Json.createFlavor RestJson
type
RestAttesterDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
committee_index*: CommitteeIndex
committee_length*: uint64
committees_at_slot*: uint64
validator_committee_index*: ValidatorIndex
slot*: Slot
RestProposerDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
slot*: Slot
RestCommitteeSubscription* = object
validator_index*: ValidatorIndex
committee_index*: CommitteeIndex
committees_at_slot*: uint64
slot*: Slot
is_aggregator*: bool
RestBeaconGenesis* = object
genesis_time*: uint64
genesis_validators_root*: Eth2Digest
genesis_fork_version*: Version
RestValidatorBalance* = object
index*: ValidatorIndex
balance*: string
RestBeaconStatesCommittees* = object
index*: CommitteeIndex
slot*: Slot
validators*: seq[ValidatorIndex]
RestAttestationsFailure* = object
index*: uint64
message*: string
RestValidator* = object
index*: ValidatorIndex
balance*: string
status*: string
validator*: Validator
RestVersion* = object
version*: string
RestSyncInfo* = object
head_slot*: Slot
sync_distance*: uint64
is_syncing*: bool
RestConfig* = object
CONFIG_NAME*: string
MAX_COMMITTEES_PER_SLOT*: uint64
TARGET_COMMITTEE_SIZE*: uint64
MAX_VALIDATORS_PER_COMMITTEE*: uint64
MIN_PER_EPOCH_CHURN_LIMIT*: uint64
CHURN_LIMIT_QUOTIENT*: uint64
SHUFFLE_ROUND_COUNT*: uint64
MIN_GENESIS_ACTIVE_VALIDATOR_COUNT*: uint64
MIN_GENESIS_TIME*: uint64
HYSTERESIS_QUOTIENT*: uint64
HYSTERESIS_DOWNWARD_MULTIPLIER*: uint64
HYSTERESIS_UPWARD_MULTIPLIER*: uint64
SAFE_SLOTS_TO_UPDATE_JUSTIFIED*: uint64
ETH1_FOLLOW_DISTANCE*: uint64
TARGET_AGGREGATORS_PER_COMMITTEE*: uint64
RANDOM_SUBNETS_PER_VALIDATOR*: uint64
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION*: uint64
SECONDS_PER_ETH1_BLOCK*: uint64
DEPOSIT_CHAIN_ID*: uint64
DEPOSIT_NETWORK_ID*: uint64
DEPOSIT_CONTRACT_ADDRESS*: Eth1Address
MIN_DEPOSIT_AMOUNT*: uint64
MAX_EFFECTIVE_BALANCE*: uint64
EJECTION_BALANCE*: uint64
EFFECTIVE_BALANCE_INCREMENT*: uint64
GENESIS_FORK_VERSION*: Version
BLS_WITHDRAWAL_PREFIX*: byte
GENESIS_DELAY*: uint64
SECONDS_PER_SLOT*: uint64
MIN_ATTESTATION_INCLUSION_DELAY*: uint64
SLOTS_PER_EPOCH*: uint64
MIN_SEED_LOOKAHEAD*: uint64
MAX_SEED_LOOKAHEAD*: uint64
EPOCHS_PER_ETH1_VOTING_PERIOD*: uint64
SLOTS_PER_HISTORICAL_ROOT*: uint64
MIN_VALIDATOR_WITHDRAWABILITY_DELAY*: uint64
SHARD_COMMITTEE_PERIOD*: uint64
MIN_EPOCHS_TO_INACTIVITY_PENALTY*: uint64
EPOCHS_PER_HISTORICAL_VECTOR*: uint64
EPOCHS_PER_SLASHINGS_VECTOR*: uint64
HISTORICAL_ROOTS_LIMIT*: uint64
VALIDATOR_REGISTRY_LIMIT*: uint64
BASE_REWARD_FACTOR*: uint64
WHISTLEBLOWER_REWARD_QUOTIENT*: uint64
PROPOSER_REWARD_QUOTIENT*: uint64
INACTIVITY_PENALTY_QUOTIENT*: uint64
MIN_SLASHING_PENALTY_QUOTIENT*: uint64
PROPORTIONAL_SLASHING_MULTIPLIER*: uint64
MAX_PROPOSER_SLASHINGS*: uint64
MAX_ATTESTER_SLASHINGS*: uint64
MAX_ATTESTATIONS*: uint64
MAX_DEPOSITS*: uint64
MAX_VOLUNTARY_EXITS*: uint64
DOMAIN_BEACON_PROPOSER*: DomainType
DOMAIN_BEACON_ATTESTER*: DomainType
DOMAIN_RANDAO*: DomainType
DOMAIN_DEPOSIT*: DomainType
DOMAIN_VOLUNTARY_EXIT*: DomainType
DOMAIN_SELECTION_PROOF*: DomainType
DOMAIN_AGGREGATE_AND_PROOF*: DomainType
RestGenericError* = object
code*: uint64
message*: string
stacktraces*: Option[seq[string]]
RestAttestationError* = object
code*: uint64
message*: string
failures*: seq[RestAttestationsFailure]
DataEnclosedObject*[T] = object
data*: T
DataRootEnclosedObject*[T] = object
dependent_root*: Eth2Digest
data*: T
DataRestBeaconGenesis* = DataEnclosedObject[RestBeaconGenesis]
DataRestFork* = DataEnclosedObject[Fork]
DataRestProposerDuties* = DataRootEnclosedObject[seq[RestProposerDuty]]
DataRestAttesterDuties* = DataRootEnclosedObject[seq[RestAttesterDuty]]
DataRestBeaconBlock* = DataEnclosedObject[BeaconBlock]
DataRestAttestationData* = DataEnclosedObject[AttestationData]
DataRestAttestation* = DataEnclosedObject[Attestation]
DataRestSyncInfo* = DataEnclosedObject[RestSyncInfo]
DataRestValidator* = DataEnclosedObject[RestValidator]
DataRestValidatorList* = DataEnclosedObject[seq[RestValidator]]
DataRestVersion* = DataEnclosedObject[RestVersion]
DataRestConfig* = DataEnclosedObject[RestConfig]
EncodeTypes* = SignedBeaconBlock
EncodeArrays* = seq[ValidatorIndex] | seq[Attestation] |
seq[SignedAggregateAndProof] | seq[RestCommitteeSubscription]
DecodeTypes* = DataRestBeaconGenesis | DataRestFork | DataRestProposerDuties |
DataRestAttesterDuties | DataRestBeaconBlock |
DataRestAttestationData | DataRestAttestation |
DataRestSyncInfo | DataRestValidator |
DataRestValidatorList | DataRestVersion |
DataRestConfig | RestGenericError | RestAttestationError
proc jsonResponseWRoot*(t: typedesc[RestApiResponse],
data: auto,
dependent_root: Eth2Digest): RestApiResponse =
@ -47,8 +204,57 @@ proc jsonResponseWMeta*(t: typedesc[RestApiResponse],
RestApiResponse.response(stream.getOutput(seq[byte]), Http200,
"application/json")
proc jsonMsgResponse*(t: typedesc[RestApiResponse],
msg: string = ""): RestApiResponse =
let data =
block:
var default: seq[string]
var stream = memoryOutput()
var writer = JsonWriter[RestJson].init(stream)
writer.beginRecord()
writer.writeField("code", "200")
writer.writeField("message", msg)
writer.writeField("stacktrace", default)
writer.endRecord()
stream.getOutput(seq[byte])
RestApiResponse.response(data, Http200, "application/json")
proc jsonError*(t: typedesc[RestApiResponse], status: HttpCode = Http200,
msg: string = "", stacktrace: string = ""): RestApiResponse =
msg: string = ""): RestApiResponse =
let data =
block:
var default: seq[string]
var stream = memoryOutput()
var writer = JsonWriter[RestJson].init(stream)
writer.beginRecord()
writer.writeField("code", Base10.toString(uint64(status.toInt())))
writer.writeField("message", msg)
writer.writeField("stacktrace", default)
writer.endRecord()
stream.getOutput(string)
RestApiResponse.error(status, data, "application/json")
proc jsonError*(t: typedesc[RestApiResponse], status: HttpCode = Http200,
msg: string = "", stacktrace: string): RestApiResponse =
let data =
block:
var default: seq[string]
var stream = memoryOutput()
var writer = JsonWriter[RestJson].init(stream)
writer.beginRecord()
writer.writeField("code", Base10.toString(uint64(status.toInt())))
writer.writeField("message", msg)
if len(stacktrace) > 0:
writer.writeField("stacktrace", [stacktrace])
else:
writer.writeField("stacktrace", default)
writer.endRecord()
stream.getOutput(string)
RestApiResponse.error(status, data, "application/json")
proc jsonError*(t: typedesc[RestApiResponse], status: HttpCode = Http200,
msg: string = "",
stacktraces: openarray[string]): RestApiResponse =
let data =
block:
var stream = memoryOutput()
@ -56,8 +262,7 @@ proc jsonError*(t: typedesc[RestApiResponse], status: HttpCode = Http200,
writer.beginRecord()
writer.writeField("code", Base10.toString(uint64(status.toInt())))
writer.writeField("message", msg)
if len(stacktrace) > 0:
writer.writeField("stacktrace", stacktrace)
writer.writeField("stacktrace", stacktraces)
writer.endRecord()
stream.getOutput(string)
RestApiResponse.error(status, data, "application/json")
@ -93,6 +298,40 @@ proc readValue*(reader: var JsonReader[RestJson], value: var uint64) {.
else:
reader.raiseUnexpectedValue($res.error())
## byte
proc writeValue*(w: var JsonWriter[RestJson], value: byte) =
var data: array[1, byte]
data[0] = value
writeValue(w, hexOriginal(data))
proc readValue*(reader: var JsonReader[RestJson], value: var byte) {.
raises: [IOError, SerializationError, Defect].} =
var data: array[1, byte]
try:
hexToByteArray(reader.readValue(string), data)
value = data[0]
except ValueError:
raiseUnexpectedValue(reader,
"byte value should be a valid hex string")
## DomainType
proc writeValue*(w: var JsonWriter[RestJson], value: DomainType) =
writeValue(w, hexOriginal(uint32(value).toBytesLE()))
proc readValue*(reader: var JsonReader[RestJson], value: var DomainType) {.
raises: [IOError, SerializationError, Defect].} =
var data: array[4, byte]
try:
hexToByteArray(reader.readValue(string), data)
let res = uint32.fromBytesLE(data)
if res >= uint32(low(DomainType)) and res <= uint32(high(DomainType)):
value = cast[DomainType](res)
else:
raiseUnexpectedValue(reader, "Incorrect DomainType value")
except ValueError:
raiseUnexpectedValue(reader,
"DomainType value should be a valid hex string")
## Slot
proc writeValue*(writer: var JsonWriter[RestJson], value: Slot) {.
raises: [IOError, Defect].} =
@ -354,3 +593,77 @@ RestJson.useCustomSerialization(BeaconState.justification_bits):
"The `justification_bits` value must be a hex string")
write:
writer.writeValue "0x" & toHex([value])
proc encodeBytes*[T: EncodeTypes](value: T,
contentType: string): RestResult[seq[byte]] =
case contentType
of "application/json":
var stream = memoryOutput()
var writer = JsonWriter[RestJson].init(stream)
writer.writeValue(value)
ok(stream.getOutput(seq[byte]))
else:
err("Content-Type not supported")
proc encodeBytes*[T: EncodeArrays](value: T,
contentType: string): RestResult[seq[byte]] =
case contentType
of "application/json":
var stream = memoryOutput()
var writer = JsonWriter[RestJson].init(stream)
writer.writeArray(value)
ok(stream.getOutput(seq[byte]))
else:
err("Content-Type not supported")
proc decodeBytes*[T: DecodeTypes](t: typedesc[T], value: openarray[byte],
contentType: string): RestResult[T] =
case contentType
of "application/json":
let res =
try:
RestJson.decode(value, T)
except SerializationError:
return err("Serialization error")
ok(res)
else:
err("Content-Type not supported")
proc encodeString*(value: string): RestResult[string] =
ok(value)
proc encodeString*(value: Epoch|Slot|CommitteeIndex): RestResult[string] =
ok(Base10.toString(uint64(value)))
proc encodeString*(value: ValidatorSig): RestResult[string] =
ok(hexOriginal(toRaw(value)))
proc encodeString*(value: GraffitiBytes): RestResult[string] =
ok(hexOriginal(distinctBase(value)))
proc encodeString*(value: Eth2Digest): RestResult[string] =
ok(hexOriginal(value.data))
proc encodeString*(value: ValidatorIdent): RestResult[string] =
case value.kind
of ValidatorQueryKind.Index:
ok(Base10.toString(uint64(value.index)))
of ValidatorQueryKind.Key:
ok(hexOriginal(toRaw(value.key)))
proc encodeString*(value: StateIdent): RestResult[string] =
case value.kind
of StateQueryKind.Slot:
ok(Base10.toString(uint64(value.slot)))
of StateQueryKind.Root:
ok(hexOriginal(value.root.data))
of StateQueryKind.Named:
case value.value
of StateIdentType.Head:
ok("head")
of StateIdentType.Genesis:
ok("genesis")
of StateIdentType.Finalized:
ok("finalized")
of StateIdentType.Justified:
ok("justified")

View File

@ -291,3 +291,13 @@ proc installNodeApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/node/health",
"/api/eth/v1/node/health"
)
proc getSyncingStatus*(): RestResponse[DataRestSyncInfo] {.
rest, endpoint: "/eth/v1/node/syncing",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Node/getSyncingStatus
proc getVersion*(): RestResponse[DataRestVersion] {.
rest, endpoint: "/eth/v1/node/version",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Node/getNodeVersion

View File

@ -1,4 +1,4 @@
import presto,
import presto, presto/client as presto_client,
libp2p/peerid,
stew/[base10, byteutils],
faststreams/[outputs],
@ -7,7 +7,7 @@ import presto,
../spec/[crypto, datatypes, digest, forkedbeaconstate_helpers],
../beacon_node_common,
../consensus_object_pools/[block_pools_types, blockchain_dag]
export blockchain_dag, presto
export blockchain_dag, presto, presto_client
const
DecimalSet = {'0' .. '9'}
@ -75,6 +75,8 @@ const
"rejected"
AggregateAndProofValidationSuccess* =
"Aggregate and proof object(s) was broadcasted"
BeaconCommitteeSubscriptionSuccess* =
"Beacon node processed committee subscription request"
InvalidParentRootValueError* =
"Invalid parent root value"
MissingSlotValueError* =
@ -567,3 +569,18 @@ proc toValidatorIndex*(value: RestValidatorIndex): Result[ValidatorIndex,
err(ValidatorIndexError.TooHighValue)
else:
doAssert(false, "ValidatorIndex type size is incorrect")
proc init*(t: typedesc[StateIdent], v: StateIdentType): StateIdent =
StateIdent(kind: StateQueryKind.Named, value: v)
proc init*(t: typedesc[StateIdent], v: Slot): StateIdent =
StateIdent(kind: StateQueryKind.Slot, slot: v)
proc init*(t: typedesc[StateIdent], v: Eth2Digest): StateIdent =
StateIdent(kind: StateQueryKind.Root, root: v)
proc init*(t: typedesc[ValidatorIdent], v: ValidatorIndex): ValidatorIdent =
ValidatorIdent(kind: ValidatorQueryKind.Index, index: RestValidatorIndex(v))
proc init*(t: typedesc[ValidatorIdent], v: ValidatorPubKey): ValidatorIdent =
ValidatorIdent(kind: ValidatorQueryKind.Key, key: v)

View File

@ -19,28 +19,6 @@ import
logScope: topics = "rest_validatorapi"
type
RestAttesterDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
committee_index*: CommitteeIndex
committee_length*: uint64
committees_at_slot*: uint64
validator_committee_index*: ValidatorIndex
slot*: Slot
RestProposerDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
slot*: Slot
RestCommitteeSubscription* = object
validator_index*: ValidatorIndex
committee_index*: CommitteeIndex
committees_at_slot*: uint64
slot*: Slot
is_aggregator*: bool
proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
# https://ethereum.github.io/eth2.0-APIs/#/Validator/getAttesterDuties
router.api(MethodPost, "/api/eth/v1/validator/duties/attester/{epoch}") do (
@ -312,8 +290,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
$res.error())
node.network.broadcast(node.topicAggregateAndProofs, item)
return RestApiResponse.jsonError(Http200,
AggregateAndProofValidationSuccess)
return RestApiResponse.jsonMsgResponse(AggregateAndProofValidationSuccess)
# https://ethereum.github.io/eth2.0-APIs/#/Validator/prepareBeaconCommitteeSubnet
router.api(MethodPost,
@ -367,7 +344,8 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
get_committee_count_per_slot(epochRef), request.slot,
request.committee_index)
)
return RestApiResponse.jsonError(Http500, NoImplementationError)
warn "Beacon committee subscription request served, but not implemented"
return RestApiResponse.jsonMsgResponse(BeaconCommitteeSubscriptionSuccess)
router.redirect(
MethodPost,
@ -404,3 +382,47 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
"/eth/v1/validator/beacon_committee_subscriptions",
"/api/eth/v1/validator/beacon_committee_subscriptions"
)
proc getProposerDuties*(epoch: Epoch): RestResponse[DataRestProposerDuties] {.
rest, endpoint: "/eth/v1/validator/duties/proposer/{epoch}",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/getProposerDuties
proc getAttesterDuties*(epoch: Epoch,
body: seq[ValidatorIndex]
): RestResponse[DataRestAttesterDuties] {.
rest, endpoint: "/eth/v1/validator/duties/attester/{epoch}",
meth: MethodPost.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/getAttesterDuties
proc produceBlock*(slot: Slot, randao_reveal: ValidatorSig,
graffiti: GraffitiBytes
): RestResponse[DataRestBeaconBlock] {.
rest, endpoint: "/eth/v1/validator/blocks/{slot}",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/produceBlock
proc produceAttestationData*(slot: Slot,
committee_index: CommitteeIndex
): RestResponse[DataRestAttestationData] {.
rest, endpoint: "/eth/v1/validator/attestation_data",
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/produceAttestationData
proc getAggregatedAttestation*(attestation_data_root: Eth2Digest,
slot: Slot): RestResponse[DataRestAttestation] {.
rest, endpoint: "/eth/v1/validator/aggregate_attestation"
meth: MethodGet.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/getAggregatedAttestation
proc publishAggregateAndProofs*(body: seq[SignedAggregateAndProof]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/aggregate_and_proofs",
meth: MethodPost.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/publishAggregateAndProofs
proc prepareBeaconCommitteeSubnet*(body: seq[RestCommitteeSubscription]
): RestPlainResponse {.
rest, endpoint: "/eth/v1/validator/beacon_committee_subscriptions",
meth: MethodPost.}
## https://ethereum.github.io/eth2.0-APIs/#/Validator/prepareBeaconCommitteeSubnet

View File

@ -0,0 +1,810 @@
import common
type
ApiResponse*[T] = Result[T, string]
ApiOperation = enum
Success, Timeout, Failure, Interrupt
proc checkCompatible*(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
let info =
try:
debug "Requesting beacon node network configuration"
let res = await node.client.getConfig()
res.data.data
except CancelledError as exc:
error "Configuration request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to obtain beacon node's configuration",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
let genesis =
try:
debug "Requesting beacon node genesis information"
let res = await node.client.getBeaconGenesis()
res.data.data
except CancelledError as exc:
error "Genesis request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to obtain beacon node's genesis",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
let genesisFlag = (genesis != vc.beaconGenesis)
let configFlag =
info.MAX_VALIDATORS_PER_COMMITTEE != MAX_VALIDATORS_PER_COMMITTEE or
info.SLOTS_PER_EPOCH != SLOTS_PER_EPOCH or
info.SECONDS_PER_SLOT != SECONDS_PER_SLOT or
info.EPOCHS_PER_ETH1_VOTING_PERIOD != EPOCHS_PER_ETH1_VOTING_PERIOD or
info.SLOTS_PER_HISTORICAL_ROOT != SLOTS_PER_HISTORICAL_ROOT or
info.EPOCHS_PER_HISTORICAL_VECTOR != EPOCHS_PER_HISTORICAL_VECTOR or
info.EPOCHS_PER_SLASHINGS_VECTOR != EPOCHS_PER_SLASHINGS_VECTOR or
info.HISTORICAL_ROOTS_LIMIT != HISTORICAL_ROOTS_LIMIT or
info.VALIDATOR_REGISTRY_LIMIT != VALIDATOR_REGISTRY_LIMIT or
info.MAX_PROPOSER_SLASHINGS != MAX_PROPOSER_SLASHINGS or
info.MAX_ATTESTER_SLASHINGS != MAX_ATTESTER_SLASHINGS or
info.MAX_ATTESTATIONS != MAX_ATTESTATIONS or
info.MAX_DEPOSITS != MAX_DEPOSITS or
info.MAX_VOLUNTARY_EXITS != MAX_VOLUNTARY_EXITS or
info.DOMAIN_BEACON_PROPOSER != DOMAIN_BEACON_PROPOSER or
info.DOMAIN_BEACON_ATTESTER != DOMAIN_BEACON_ATTESTER or
info.DOMAIN_RANDAO != DOMAIN_RANDAO or
info.DOMAIN_DEPOSIT != DOMAIN_DEPOSIT or
info.DOMAIN_VOLUNTARY_EXIT != DOMAIN_VOLUNTARY_EXIT or
info.DOMAIN_SELECTION_PROOF != DOMAIN_SELECTION_PROOF or
info.DOMAIN_AGGREGATE_AND_PROOF != DOMAIN_AGGREGATE_AND_PROOF
if configFlag or genesisFlag:
node.status = RestBeaconNodeStatus.Incompatible
warn "Beacon node has incompatible configuration",
genesis_flag = genesisFlag, config_flag = configFlag
else:
info "Beacon node has compatible configuration"
node.config = some(info)
node.genesis = some(genesis)
node.status = RestBeaconNodeStatus.Online
proc checkSync*(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
let syncInfo =
try:
debug "Requesting beacon node sync status"
let res = await node.client.getSyncingStatus()
res.data.data
except CancelledError as exc:
error "Sync status request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to obtain beacon node's sync status",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
node.syncInfo = some(syncInfo)
node.status =
if not(syncInfo.is_syncing) or (syncInfo.sync_distance < SYNC_TOLERANCE):
info "Beacon node is in sync", sync_distance = syncInfo.sync_distance,
head_slot = syncInfo.head_slot
RestBeaconNodeStatus.Online
else:
warn "Beacon node not in sync", sync_distance = syncInfo.sync_distance,
head_slot = syncInfo.head_slot
RestBeaconNodeStatus.NotSynced
proc checkOnline*(node: BeaconNodeServerRef) {.async.} =
logScope: endpoint = node
debug "Checking beacon node status"
let agent =
try:
let res = await node.client.getVersion()
res.data.data
except CancelledError as exc:
error "Status request was interrupted"
node.status = RestBeaconNodeStatus.Offline
raise exc
except RestError as exc:
error "Unable to check beacon node's status",
error_name = exc.name, error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
except CatchableError as exc:
error "Unexpected exception", error_name = exc.name,
error_message = exc.msg
node.status = RestBeaconNodeStatus.Offline
return
debug "Beacon node has been identified", agent = agent.version
node.ident = some(agent.version)
node.status = RestBeaconNodeStatus.Online
proc checkNode*(vc: ValidatorClientRef,
node: BeaconNodeServerRef) {.async.} =
debug "Checking beacon node", endpoint = node
await node.checkOnline()
if node.status != RestBeaconNodeStatus.Online:
return
await vc.checkCompatible(node)
if node.status != RestBeaconNodeStatus.Online:
return
await vc.checkSync(node)
proc checkNodes*(vc: ValidatorClientRef,
nodeStatuses: set[RestBeaconNodeStatus]) {.async.} =
doAssert(RestBeaconNodeStatus.Online notin nodeStatuses)
let nodesToCheck =
vc.beaconNodes.filterIt(it.status in nodeStatuses)
let pending =
block:
var res: seq[Future[void]]
for node in nodesToCheck:
res.add(vc.checkNode(node))
res
if len(pending) > 0:
try:
await allFutures(pending)
except CancelledError as exc:
# allFutures() did not cancel passed Futures, so we need to send
# cancellation to all the children.
for fut in pending:
if not(fut.finished()):
fut.cancel()
await allFutures(pending)
raise exc
template onceToAll*(vc: ValidatorClientRef, responseType: typedesc,
timeout: Duration, body: untyped,
handlers: untyped): untyped =
var it {.inject.}: RestClientRef
var operationResult {.inject.}: bool = false
type BodyType = typeof(body)
let onlineNodes =
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
if len(onlineNodes) > 0:
var pending =
block:
var res: seq[BodyType]
for node {.inject.} in onlineNodes:
it = node.client
it = node.client
let fut = body
res.add(fut)
res
let opres =
try:
await allFutures(pending).wait(timeout)
ApiOperation.Success
except AsyncTimeoutError:
ApiOperation.Timeout
except CancelledError:
ApiOperation.Interrupt
for idx, node {.inject.} in onlineNodes.pairs():
it = node.client
let apiResponse {.inject.} =
block:
let fut = pending[idx]
if fut.finished():
if fut.failed() or fut.cancelled():
let exc = fut.readError()
ApiResponse[responseType].err("[" & $exc.name & "] " & $exc.msg)
else:
ApiResponse[responseType].ok(fut.read())
else:
case opres
of ApiOperation.Interrupt:
fut.cancel()
onlineNodes[idx].status = RestBeaconNodeStatus.Offline
ApiResponse[responseType].err("Operation interrupted")
of ApiOperation.Timeout:
fut.cancel()
onlineNodes[idx].status = RestBeaconNodeStatus.Offline
ApiResponse[responseType].err("Operation timeout exceeded")
of ApiOperation.Success, ApiOperation.Failure:
# This should not be happened, because all Futures should be
# finished, and `Failure` processed when Future is finished.
ApiResponse[responseType].err("Unexpected error")
node.status = handlers
if node.status == RestBeaconNodeStatus.Online:
operationResult = true
template firstSuccessTimeout*(vc: ValidatorClientRef, respType: typedesc,
timeout: Duration, body: untyped,
handlers: untyped): untyped =
doAssert(timeout != ZeroDuration)
var it {.inject.}: RestClientRef
var timerFut =
if timeout != InfiniteDuration:
sleepAsync(timeout)
else:
nil
var iterationsCount = 0
while true:
let onlineNodes =
vc.beaconNodes.filterIt(it.status == RestBeaconNodeStatus.Online)
if iterationsCount != 0:
debug "Request got failed", iterations_count = iterationsCount
var exitNow = false
for node {.inject.} in onlineNodes:
it = node.client
var bodyFut = body
let resOp =
block:
if isNil(timerFut):
try:
# We use `allFutures()` to keep result in `bodyFut`, but still
# be able to check errors.
await allFutures(bodyFut)
ApiOperation.Success
except CancelledError:
# `allFutures()` could not cancel Futures.
await bodyFut.cancelAndWait()
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `allFutures()` start raise
# exceptions.
ApiOperation.Failure
else:
try:
discard await race(bodyFut, timerFut)
if bodyFut.finished():
ApiOperation.Success
else:
await bodyFut.cancelAndWait()
ApiOperation.Timeout
except CancelledError:
# `race()` could not cancel Futures.
if not(bodyFut.finished()):
if not(timerFut.finished()):
timerFut.cancel()
await allFutures(bodyFut.cancelAndWait(), timerFut)
else:
await cancelAndWait(timerFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race()` start raise exceptions.
ApiOperation.Failure
block:
let apiResponse {.inject.} =
block:
if bodyFut.finished():
if bodyFut.failed() or bodyFut.cancelled():
let exc = bodyFut.readError()
ApiResponse[respType].err("[" & $exc.name & "] " & $exc.msg)
else:
ApiResponse[respType].ok(bodyFut.read())
else:
case resOp
of ApiOperation.Interrupt:
ApiResponse[respType].err("Operation was interrupted")
of ApiOperation.Timeout:
ApiResponse[respType].err("Operation timeout exceeded")
of ApiOperation.Success, ApiOperation.Failure:
# This should not be happened, because all Futures should be
# finished, and `Failure` processed when Future is finished.
ApiResponse[respType].err("Unexpected error")
let status =
try:
handlers
except CatchableError:
raiseAssert("Response handler must not raise exceptions")
node.status = status
if resOp == ApiOperation.Success:
if node.status == RestBeaconNodeStatus.Online:
exitNow = true
break
else:
exitNow = true
break
if exitNow:
break
let offlineMask = {RestBeaconNodeStatus.Offline,
RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.Uninitalized}
let offlineNodes = vc.beaconNodes.filterIt(it.status in offlineMask)
warn "There no beacon nodes available, refreshing nodes status",
online_nodes = len(onlineNodes), offline_nodes = len(offlineNodes)
var checkFut = vc.checkNodes(offlineMask)
let checkOp =
block:
var dontRushFut = sleepAsync(500.milliseconds)
if isNil(timerFut):
try:
# We use `allFutures()` to keep result in `checkFut`, but still
# be able to check errors.
await allFutures(checkFut, dontRushFut)
ApiOperation.Success
except CancelledError:
# `allFutures()` could not cancel Futures.
if not(checkFut.finished()):
checkFut.cancel()
if not(dontRushFut.finished()):
dontRushFut.cancel()
await allFutures(checkFut, dontRushFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race()` or `allFutures()` start raise
# exceptions.
ApiOperation.Failure
else:
try:
discard await race(allFutures(checkFut, dontRushFut), timerFut)
if checkFut.finished():
ApiOperation.Success
else:
checkFut.cancel()
if not(dontRushFut.finished()):
dontRushFut.cancel()
await allFutures(checkFut, dontRushFut)
ApiOperation.Timeout
except CancelledError:
# `race()` and `allFutures()` could not cancel Futures.
if not(timerFut.finished()):
timerFut.cancel()
if not(dontRushFut.finished()):
dontRushFut.cancel()
if not(checkFut.finished()):
checkFut.cancel()
await allFutures(checkFut, dontRushFut, timerFut)
ApiOperation.Interrupt
except CatchableError as exc:
# This only could happened if `race` or `allFutures` start raise
# exceptions.
ApiOperation.Failure
if checkOp != ApiOperation.Success:
exitNow = true
break
proc getProposerDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[DataRestProposerDuties] {.
async.} =
logScope: request = "getProposerDuties"
vc.firstSuccessTimeout(RestResponse[DataRestProposerDuties], SlotDuration,
getProposerDuties(it, epoch)):
if apiResponse.isErr():
debug "Unable to retrieve proposer duties", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successfull response", endpoint = node
return response.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = 503, endpoint = node
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve proposer duties")
proc getAttesterDuties*(vc: ValidatorClientRef, epoch: Epoch,
validators: seq[ValidatorIndex]
): Future[DataRestAttesterDuties] {.async.} =
logScope: request = "getAttesterDuties"
vc.firstSuccessTimeout(RestResponse[DataRestAttesterDuties], SlotDuration,
getAttesterDuties(it, epoch, validators)):
if apiResponse.isErr():
debug "Unable to retrieve attester duties", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successfull response", endpoint = node
return response.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve attester duties")
proc getHeadStateFork*(vc: ValidatorClientRef): Future[Fork] {.async.} =
logScope: request = "getHeadStateFork"
let stateIdent = StateIdent.init(StateIdentType.Head)
vc.firstSuccessTimeout(RestResponse[DataRestFork], SlotDuration,
getStateFork(it, stateIdent)):
if apiResponse.isErr():
debug "Unable to retrieve head state's fork", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successfull response", endpoint = node
return response.data.data
of 400, 404:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve head state's fork")
proc getValidators*(vc: ValidatorClientRef,
id: seq[ValidatorIdent]): Future[seq[RestValidator]] {.
async.} =
logScope: request = "getStateValidators"
let stateIdent = StateIdent.init(StateIdentType.Head)
vc.firstSuccessTimeout(RestResponse[DataRestValidatorList], SlotDuration,
getStateValidators(it, stateIdent, id)):
if apiResponse.isErr():
debug "Unable to retrieve head state's validator information",
endpoint = node, error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successfull response", endpoint = node
return response.data.data
of 400, 404:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError,
"Unable to retrieve head state's validator information")
proc produceAttestationData*(vc: ValidatorClientRef, slot: Slot,
committee_index: CommitteeIndex
): Future[AttestationData] {.async.} =
logScope: request = "produceAttestationData"
vc.firstSuccessTimeout(RestResponse[DataRestAttestationData],
OneThirdDuration,
produceAttestationData(it, slot, committee_index)):
if apiResponse.isErr():
debug "Unable to retrieve attestation data", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Received successfull response", endpoint = node
return response.data.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve attestation data")
proc getAttestationErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestAttestationError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
let failures = errorObj.failures.mapIt(Base10.toString(it.index) & ": " &
it.message)
errorObj.message & ": [" & failures.join(", ") & "]"
else:
"Unable to decode error response: [" & $res.error() & "]"
proc getGenericErrorMessage(response: RestPlainResponse): string =
let res = decodeBytes(RestGenericError, response.data,
response.contentType)
if res.isOk():
let errorObj = res.get()
if errorObj.stacktraces.isSome():
errorObj.message & ": [" & errorObj.stacktraces.get().join("; ") & "]"
else:
errorObj.message
else:
"Unable to decode error response: [" & $res.error() & "]"
proc submitPoolAttestations*(vc: ValidatorClientRef,
data: seq[Attestation]): Future[bool] {.
async.} =
logScope: request = "submitPoolAttestations"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
submitPoolAttestations(it, data)):
if apiResponse.isErr():
debug "Unable to submit attestation", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Attestation was sucessfully published", endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getAttestationErrorMessage()
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getAttestationErrorMessage()
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getAttestationErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to submit attestation")
proc getAggregatedAttestation*(vc: ValidatorClientRef, slot: Slot,
root: Eth2Digest): Future[Attestation] {.
async.} =
logScope: request = "getAggregatedAttestation"
vc.firstSuccessTimeout(RestResponse[DataRestAttestation],
OneThirdDuration,
getAggregatedAttestation(it, root, slot)):
if apiResponse.isErr():
debug "Unable to retrieve aggregated attestation data", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status:
of 200:
debug "Received successfull response", endpoint = node
return response.data.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError,
"Unable to retrieve aggregated attestation data")
proc publishAggregateAndProofs*(vc: ValidatorClientRef,
data: seq[SignedAggregateAndProof]): Future[bool] {.
async.} =
logScope: request = "publishAggregateAndProofs"
vc.firstSuccessTimeout(RestPlainResponse, SlotDuration,
publishAggregateAndProofs(it, data)):
if apiResponse.isErr():
debug "Unable to publish aggregate and proofs", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status:
of 200:
debug "Aggregate and proofs was sucessfully published", endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError,
"Unable to publish aggregate and proofs")
proc produceBlock*(vc: ValidatorClientRef, slot: Slot,
randao_reveal: ValidatorSig,
graffiti: GraffitiBytes): Future[BeaconBlock] {.async.} =
logScope: request = "produceBlock"
vc.firstSuccessTimeout(RestResponse[DataRestBeaconBlock],
SlotDuration,
produceBlock(it, slot, randao_reveal, graffiti)):
if apiResponse.isErr():
debug "Unable to retrieve block data", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status:
of 200:
debug "Received successfull response", endpoint = node
return response.data.data
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to retrieve block data")
proc publishBlock*(vc: ValidatorClientRef,
data: SignedBeaconBlock): Future[bool] {.async.} =
logScope: request = "publishBlock"
vc.firstSuccessTimeout(RestPlainResponse,
SlotDuration, publishBlock(it, data)):
if apiResponse.isErr():
debug "Unable to publish block", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status:
of 200:
debug "Block was successfully published", endpoint = node
return true
of 202:
debug "Block not passed validation, but still published",
endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to publish block")
proc prepareBeaconCommitteeSubnet*(vc: ValidatorClientRef,
data: seq[RestCommitteeSubscription]
): Future[bool] {.async.} =
logScope: request = "prepareBeaconCommitteeSubnet"
vc.firstSuccessTimeout(RestPlainResponse, OneThirdDuration,
prepareBeaconCommitteeSubnet(it, data)):
if apiResponse.isErr():
debug "Unable to prepare committee subnet", endpoint = node,
error = apiResponse.error()
RestBeaconNodeStatus.Offline
else:
let response = apiResponse.get()
case response.status
of 200:
debug "Commitee subnet was successfully prepared", endpoint = node
return true
of 400:
debug "Received invalid request response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
return false
of 500:
debug "Received internal error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
of 503:
debug "Received not synced error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.NotSynced
else:
debug "Received unexpected error response",
response_code = response.status, endpoint = node,
response_error = response.getGenericErrorMessage()
RestBeaconNodeStatus.Offline
raise newException(ValidatorApiError, "Unable to prepare committee subnet")

View File

@ -0,0 +1,253 @@
import std/[sets, sequtils]
import chronicles
import common, api, block_service
logScope: service = "attestation_service"
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
duty: RestAttesterDuty): Future[bool] {.async.} =
let vc = service.client
let validator = vc.attachedValidators.getValidator(duty.pubkey)
if validator.index.isNone():
warn "Validator index is missing", validator = validator.pubKey
return false
let fork = vc.fork.get()
# TODO: signing_root is recomputed in signBlockProposal just after,
# but not for locally attached validators.
let signingRoot =
compute_attestation_root(fork, vc.beaconGenesis.genesis_validators_root,
adata)
let vindex = validator.index.get()
let notSlashable = vc.attachedValidators.slashingProtection
.registerAttestation(vindex, validator.pubKey,
adata.source.epoch,
adata.target.epoch, signingRoot)
if notSlashable.isErr():
warn "Slashing protection activated for attestation", slot = duty.slot,
validator = validator.pubKey, validator_index = duty.validator_index,
badVoteDetails = $notSlashable.error
return false
let attestation = await validator.produceAndSignAttestation(adata,
int(duty.committee_length), Natural(duty.validator_committee_index),
fork, vc.beaconGenesis.genesis_validators_root)
let res =
try:
await vc.submitPoolAttestations(@[attestation])
except ValidatorApiError as exc:
error "Unable to submit attestation", slot = duty.slot,
validator = validator.pubKey, validator_index = duty.validator_index
raise exc
let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3))
if res:
notice "Attestation published", validator = validator.pubKey,
validator_index = duty.validator_index, slot = duty.slot,
delay = delay
return true
else:
warn "Attestation was not accepted by beacon node",
validator = validator.pubKey, validator_index = duty.validator_index,
slot = duty.slot, delay = delay
return false
proc produceAndPublishAttestations*(service: AttestationServiceRef,
slot: Slot, committee_index: CommitteeIndex,
duties: seq[RestAttesterDuty]
): Future[AttestationData] {.
async.} =
doAssert(MAX_VALIDATORS_PER_COMMITTEE <= uint64(high(int)))
let vc = service.client
# This call could raise ValidatorApiError, but it is handled in
# publishAttestationsAndAggregates().
let ad = await vc.produceAttestationData(slot, committee_index)
let pendingAttestations =
block:
var res: seq[Future[bool]]
for duty in duties:
debug "Serving attestation duty", duty = duty, epoch = slot.epoch()
if (duty.slot != ad.slot) or
(uint64(duty.committee_index) != ad.index):
error "Inconsistent validator duties during attestation signing",
validator = duty.pubkey, duty_slot = duty.slot,
duty_index = duty.committee_index,
attestation_slot = ad.slot, attestation_index = ad.index
continue
res.add(service.serveAttestation(ad, duty))
res
let statistics =
block:
var errored, succeed, failed = 0
try:
await allFutures(pendingAttestations)
except CancelledError:
for fut in pendingAttestations:
if not(fut.finished()):
fut.cancel()
await allFutures(pendingAttestations)
for future in pendingAttestations:
if future.done():
if future.read():
inc(succeed)
else:
inc(failed)
else:
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3))
debug "Attestation statistics", total = len(pendingAttestations),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
committee_index = committeeIndex, duties_count = len(duties)
return ad
proc produceAndPublishAggregates(service: AttestationServiceRef,
adata: AttestationData,
duties: seq[RestAttesterDuty]) {.async.} =
let
vc = service.client
slot = adata.slot
committeeIndex = CommitteeIndex(adata.index)
attestationRoot = adata.hash_tree_root()
genesisRoot = vc.beaconGenesis.genesis_validators_root
let aggAttestation =
try:
await vc.getAggregatedAttestation(slot, attestationRoot)
except ValidatorApiError:
error "Unable to retrieve aggregated attestation data"
return
let aggregateAndProofs =
block:
var res: seq[SignedAggregateAndProof]
for duty in duties:
let validator = vc.attachedValidators.getValidator(duty.pubkey)
let slotSignature = await getSlotSig(validator, vc.fork.get(),
genesisRoot, slot)
if (duty.slot != slot) or (duty.committee_index != committeeIndex):
error "Inconsistent validator duties during aggregate signing",
duty_slot = duty.slot, slot = slot,
duty_committee_index = duty.committee_index,
committee_index = committeeIndex
continue
if is_aggregator(duty.committee_length, slotSignature):
notice "Aggregating", slot = slot, validator = duty.pubkey
let aggAndProof = AggregateAndProof(
aggregator_index: uint64(duty.validator_index),
aggregate: aggAttestation,
selection_proof: slot_signature
)
let signature = await signAggregateAndProof(validator, aggAndProof,
vc.fork.get(),
genesisRoot)
res.add(SignedAggregateAndProof(message: aggAndProof,
signature: signature))
res
let count = len(aggregateAndProofs)
if count > 0:
let res =
try:
await vc.publishAggregateAndProofs(aggregateAndProofs)
except ValidatorApiError:
warn "Unable to publish aggregate and proofs"
return
if res:
notice "Successfully published aggregate and proofs", count = count
else:
warn "Aggregate and proofs not accepted by beacon node", count = count
else:
warn "No aggregate and proofs produced"
proc publishAttestationsAndAggregates(service: AttestationServiceRef,
slot: Slot,
committee_index: CommitteeIndex,
duties: seq[RestAttesterDuty]) {.async.} =
let vc = service.client
let aggregateTime =
# chronos.Duration substraction could not return negative value, in such
# case it will return `ZeroDuration`.
vc.beaconClock.durationToNextSlot() - seconds(int64(SECONDS_PER_SLOT) div 3)
# Waiting for blocks to be published before attesting.
# TODO (cheatfate): Here should be present timeout.
let startTime = Moment.now()
await vc.waitForBlockPublished(slot)
let finishTime = Moment.now()
debug "Block proposal awaited", slot = slot,
duration = (finishTime - startTime)
block:
let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3))
notice "Producing attestations", delay = delay, slot = slot,
committee_index = committee_index,
duties_count = len(duties)
let ad =
try:
await service.produceAndPublishAttestations(slot, committee_index,
duties)
except ValidatorApiError:
error "Unable to proceed attestations"
return
if aggregateTime != ZeroDuration:
await sleepAsync(aggregateTime)
block:
let delay = vc.getDelay(seconds((int64(SECONDS_PER_SLOT) div 3) * 2))
notice "Producing aggregate and proofs", delay = delay
await service.produceAndPublishAggregates(ad, duties)
proc spawnAttestationTasks(service: AttestationServiceRef,
slot: Slot) =
let vc = service.client
let dutiesByCommittee =
block:
var res: Table[CommitteeIndex, seq[RestAttesterDuty]]
let attesters = vc.getAttesterDutiesForSlot(slot)
var default: seq[RestAttesterDuty]
for item in attesters:
res.mgetOrPut(item.committee_index, default).add(item)
res
for index, duties in dutiesByCommittee.pairs():
if len(duties) > 0:
asyncSpawn service.publishAttestationsAndAggregates(slot, index, duties)
proc mainLoop(service: AttestationServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
try:
while true:
let sleepTime = vc.beaconClock.durationToNextSlot() +
seconds(int64(SECONDS_PER_SLOT) div 3)
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
service.spawnAttestationTasks(currentSlot)
await sleepAsync(sleepTime)
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
proc init*(t: typedesc[AttestationServiceRef],
vc: ValidatorClientRef): Future[AttestationServiceRef] {.async.} =
debug "Initializing service"
var res = AttestationServiceRef(client: vc, state: ServiceState.Initialized)
return res
proc start*(service: AttestationServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -0,0 +1,198 @@
import common, api
import chronicles
logScope: service = "block_service"
proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
validator: AttachedValidator) {.async.} =
logScope:
validator = validator.pubKey
slot = slot
wallSlot = currentSlot
let
genesisRoot = vc.beaconGenesis.genesis_validators_root
graffiti =
if vc.config.graffiti.isSome():
vc.config.graffiti.get()
else:
defaultGraffitiBytes()
fork = vc.fork.get()
debug "Publishing block", validator = validator.pubKey,
delay = vc.getDelay(ZeroDuration),
genesis_root = genesisRoot,
graffiti = graffiti, fork = fork, slot = slot,
wall_slot = currentSlot
try:
let randaoReveal = await validator.genRandaoReveal(fork, genesisRoot, slot)
let beaconBlock =
try:
await vc.produceBlock(slot, randaoReveal, graffiti)
except ValidatorApiError as exc:
error "Unable to retrieve block data", slot = currentSlot,
validator = validator.pubKey
return
let blockRoot = hash_tree_root(beaconBlock)
var signedBlock = SignedBeaconBlock(message: beaconBlock,
root: hash_tree_root(beaconBlock))
# TODO: signing_root is recomputed in signBlockProposal just after
let signing_root = compute_block_root(fork, genesisRoot, slot,
signedBlock.root)
let notSlashable = vc.attachedValidators
.slashingProtection
.registerBlock(ValidatorIndex(signedBlock.message.proposer_index),
validator.pubKey, slot, signing_root)
if notSlashable.isOk():
let signature = await validator.signBlockProposal(fork, genesisRoot, slot,
blockRoot)
let signedBlock = SignedBeaconBlock(message: beaconBlock, root: blockRoot,
signature: signature)
let res =
try:
await vc.publishBlock(signedBlock)
except ValidatorApiError:
error "Unable to submit block", slot = currentSlot,
validator = validator.pubKey, block_root = blockRoot,
deposits = len(signedBlock.message.body.deposits),
attestations = len(signedBlock.message.body.attestations),
graffiti = graffiti
return
if res:
notice "Block published", slot = currentSlot,
validator = validator.pubKey, validator_index = validator.index.get(),
deposits = len(signedBlock.message.body.deposits),
attestations = len(signedBlock.message.body.attestations),
graffiti = graffiti
else:
warn "Block was not accepted by beacon node", slot = currentSlot,
validator = validator.pubKey, validator_index = validator.index.get(),
deposits = len(signedBlock.message.body.deposits),
attestations = len(signedBlock.message.body.attestations),
graffiti = graffiti
else:
warn "Slashing protection activated for block proposal",
slot = currentSlot, validator = validator.pubKey,
validator_index = validator.index.get(),
existingProposal = notSlashable.error
except CatchableError as exc:
error "Unexpected error happens while proposing block",
error_name = exc.name, error_msg = exc.msg
proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
proposerKey: ValidatorPubkey) {.async.} =
let (inFuture, timeToSleep) = vc.beaconClock.fromNow(slot)
try:
if inFuture:
debug "Proposing block", timeIn = timeToSleep, validator = proposerKey
await sleepAsync(timeToSleep)
else:
debug "Proposing block", timeIn = 0.seconds, validator = proposerKey
let sres = vc.getCurrentSlot()
if sres.isSome():
let currentSlot = sres.get()
# We need to check that we still have validator in our pool.
let validator = vc.attachedValidators.getValidator(proposerKey)
if isNil(validator):
debug "Validator is not present in pool anymore, exiting",
validator = proposerKey
return
await vc.publishBlock(currentSlot, slot, validator)
except CancelledError:
debug "Proposing task was cancelled", slot = slot, validator = proposerKey
proc spawnProposalTask(vc: ValidatorClientRef,
duty: RestProposerDuty): ProposerTask =
let future = proposeBlock(vc, duty.slot, duty.pubkey)
ProposerTask(future: future, duty: duty)
proc contains(data: openArray[RestProposerDuty], task: ProposerTask): bool =
for item in data:
if (item.pubkey == task.duty.pubkey) and (item.slot == task.duty.slot):
return true
false
proc contains(data: openArray[ProposerTask], duty: RestProposerDuty): bool =
for item in data:
if (item.duty.pubkey == duty.pubkey) and (item.duty.slot == duty.slot):
return true
false
proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
dependentRoot: Eth2Digest,
duties: openArray[RestProposerDuty]) =
let epochDuties = vc.proposers.getOrDefault(epoch)
if not(epochDuties.isDefault()):
if epochDuties.dependentRoot != dependentRoot:
warn "Proposer duties re-organization",
prior_dependent_root = epochDuties.dependentRoot,
dependent_root = dependentRoot
let tasks =
block:
var res: seq[ProposerTask]
var hashset = initHashSet[Slot]()
for task in epochDuties.duties:
if task notin duties:
# Task is no more relevant, so cancel it.
debug "Cancelling running proposal duty task",
slot = task.duty.slot, validator = task.duty.pubkey
task.future.cancel()
else:
# If task is already running for proper slot, we keep it alive.
debug "Keep running previous proposal duty task",
slot = task.duty.slot, validator = task.duty.pubkey
res.add(task)
for duty in duties:
if duty notin res:
debug "New proposal duty received", slot = duty.slot,
validator = duty.pubkey
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
error "Multiple block proposers for this slot, " &
"producing blocks for all proposers", slot = duty.slot
else:
hashset.incl(duty.slot)
res.add(task)
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
else:
# Spawn new proposer tasks and modify proposers map.
let tasks =
block:
var hashset = initHashSet[Slot]()
var res: seq[ProposerTask]
for duty in duties:
debug "New proposal duty received", slot = duty.slot,
validator = duty.pubkey
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
error "Multiple block proposers for this slot, " &
"producing blocks for all proposers", slot = duty.slot
else:
hashset.incl(duty.slot)
res.add(task)
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
proc waitForBlockPublished*(vc: ValidatorClientRef, slot: Slot) {.async.} =
## This procedure will wait for all the block proposal tasks to be finished at
## slot ``slot``
let pendingTasks =
block:
var res: seq[Future[void]]
let epochDuties = vc.proposers.getOrDefault(slot.epoch())
for task in epochDuties.duties:
if task.duty.slot == slot:
if not(task.future.finished()):
res.add(task.future)
res
if len(pendingTasks) > 0:
await allFutures(pendingTasks)

View File

@ -0,0 +1,246 @@
import std/[tables, os, sequtils, strutils]
import chronos, presto, presto/client as presto_client, chronicles, confutils,
json_serialization/std/[options, net],
stew/[base10, results, byteutils]
# Local modules
import ".."/networking/[eth2_network, eth2_discovery],
".."/spec/[datatypes, digest, crypto, helpers, signatures],
".."/rpc/[beacon_rest_api, node_rest_api, validator_rest_api,
config_rest_api, rest_utils, eth2_json_rest_serialization],
".."/validators/[attestation_aggregation, keystore_management,
validator_pool, slashing_protection],
".."/[conf, beacon_clock, version, beacon_node_types,
nimbus_binary_common],
".."/ssz/merkleization,
./eth/db/[kvstore, kvstore_sqlite3]
export os, tables, sequtils, sequtils, chronos, presto, chronicles, confutils,
nimbus_binary_common, version, conf, options, tables, results, base10,
byteutils, eth2_json_rest_serialization, presto_client
export beacon_rest_api, node_rest_api, validator_rest_api, config_rest_api,
rest_utils,
datatypes, crypto, digest, signatures, merkleization,
beacon_clock,
kvstore, kvstore_sqlite3,
keystore_management, slashing_protection, validator_pool,
attestation_aggregation, beacon_node_types
const
SYNC_TOLERANCE* = 4'u64
SLOT_LOOKAHEAD* = 1.seconds
HISTORICAL_DUTIES_EPOCHS* = 2'u64
TIME_DELAY_FROM_SLOT* = 79.milliseconds
SUBSCRIPTION_BUFFER_SLOTS* = 2'u64
type
ServiceState* {.pure.} = enum
Initialized, Running, Error, Closing, Closed
BlockServiceEventRef* = ref object of RootObj
slot*: Slot
proposers*: seq[ValidatorPubKey]
ClientServiceRef* = ref object of RootObj
state*: ServiceState
lifeFut*: Future[void]
client*: ValidatorClientRef
DutiesServiceRef* = ref object of ClientServiceRef
FallbackServiceRef* = ref object of ClientServiceRef
ForkServiceRef* = ref object of ClientServiceRef
AttestationServiceRef* = ref object of ClientServiceRef
BlockServiceRef* = ref object of ClientServiceRef
DutyAndProof* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
data*: RestAttesterDuty
slotSig*: Option[ValidatorSig]
ProposerTask* = object
duty*: RestProposerDuty
future*: Future[void]
ProposedData* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
duties*: seq[ProposerTask]
BeaconNodeServer* = object
client*: RestClientRef
endpoint*: string
config*: Option[RestConfig]
ident*: Option[string]
genesis*: Option[RestBeaconGenesis]
syncInfo*: Option[RestSyncInfo]
status*: RestBeaconNodeStatus
EpochDuties* = object
duties*: Table[Epoch, DutyAndProof]
RestBeaconNodeStatus* {.pure.} = enum
Uninitalized, Offline, Incompatible, NotSynced, Online
BeaconNodeServerRef* = ref BeaconNodeServer
AttesterMap* = Table[ValidatorPubKey, EpochDuties]
ProposerMap* = Table[Epoch, ProposedData]
ValidatorClient* = object
config*: ValidatorClientConf
graffitiBytes*: GraffitiBytes
beaconNodes*: seq[BeaconNodeServerRef]
nodesAvailable*: AsyncEvent
fallbackService*: FallbackServiceRef
forkService*: ForkServiceRef
dutiesService*: DutiesServiceRef
attestationService*: AttestationServiceRef
blockService*: BlockServiceRef
runSlotLoop*: Future[void]
beaconClock*: BeaconClock
attachedValidators*: ValidatorPool
fork*: Option[Fork]
attesters*: AttesterMap
proposers*: ProposerMap
beaconGenesis*: RestBeaconGenesis
proposerTasks*: Table[Slot, seq[ProposerTask]]
ValidatorClientRef* = ref ValidatorClient
ValidatorClientError* = object of CatchableError
ValidatorApiError* = object of ValidatorClientError
const
DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
EpochDuration* = int64(SLOTS_PER_EPOCH * SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT div 3).seconds
proc `$`*(bn: BeaconNodeServerRef): string =
if bn.ident.isSome():
bn.client.address.hostname & ":" &
Base10.toString(bn.client.address.port) & " [" & bn.ident.get() & "]"
else:
bn.client.address.hostname & ":" &
Base10.toString(bn.client.address.port)
chronicles.formatIt BeaconNodeServerRef:
$it
chronicles.expandIt(RestAttesterDuty):
pubkey = shortLog(it.pubkey)
slot = it.slot
validator_index = it.validator_index
committee_index = it.committee_index
committee_length = it.committee_length
committees_at_slot = it.committees_at_slot
validator_committee_index = it.validator_committee_index
proc stop*(csr: ClientServiceRef) {.async.} =
if csr.state == ServiceState.Running:
csr.state = ServiceState.Closing
if not(csr.lifeFut.finished()):
await csr.lifeFut.cancelAndWait()
csr.state = ServiceState.Closed
proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc isDefault*(prd: ProposedData): bool =
prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest,
duty: RestAttesterDuty,
slotSig: Option[ValidatorSig]): DutyAndProof =
DutyAndProof(epoch: epoch, dependentRoot: dependentRoot, data: duty,
slotSig: slotSig)
proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest,
data: openarray[ProposerTask]): ProposedData =
ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data)
proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] =
let
wallTime = vc.beaconClock.now()
wallSlot = wallTime.toSlot()
if not(wallSlot.afterGenesis):
let checkGenesisTime = vc.beaconClock.fromNow(toBeaconTime(Slot(0)))
warn "Jump in time detected, something wrong with wallclock",
wall_time = wallTime, genesisIn = checkGenesisTime.offset
none[Slot]()
else:
some(wallSlot.slot)
proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[RestAttesterDuty] =
## Returns all `DutyAndPrrof` for the given `slot`.
var res: seq[RestAttesterDuty]
let epoch = slot.epoch()
for key, item in vc.attesters.pairs():
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
if duty.data.slot == slot:
res.add(duty.data)
res
proc getDurationToNextAttestation*(vc: ValidatorClientRef,
slot: Slot): string =
var minimumDuration = InfiniteDuration
let currentSlotTime = Duration(slot.toBeaconTime())
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
for key, item in vc.attesters.pairs():
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
let dutySlotTime = Duration(duty.data.slot.toBeaconTime())
if dutySlotTime >= currentSlotTime:
let timeLeft = dutySlotTime - currentSlotTime
if timeLeft < minimumDuration:
minimumDuration = timeLeft
if minimumDuration != InfiniteDuration:
break
if minimumDuration == InfiniteDuration:
"<unknown>"
else:
$(minimumDuration + seconds(int64(SECONDS_PER_SLOT) div 3))
proc getDurationToNextBlock*(vc: ValidatorClientRef, slot: Slot): string =
var minimumDuration = InfiniteDuration
var currentSlotTime = Duration(slot.toBeaconTime())
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
let data = vc.proposers.getOrDefault(epoch)
if not(data.isDefault()):
for item in data.duties:
if item.duty.pubkey in vc.attachedValidators:
let proposalSlotTime = Duration(item.duty.slot.toBeaconTime())
if proposalSlotTime >= currentSlotTime:
let timeLeft = proposalSlotTime - currentSlotTime
if timeLeft < minimumDuration:
minimumDuration = timeLeft
if minimumDuration != InfiniteDuration:
break
if minimumDuration == InfiniteDuration:
"<unknown>"
else:
$minimumDuration
iterator attesterDutiesForEpoch*(vc: ValidatorClientRef,
epoch: Epoch): DutyAndProof =
for key, item in vc.attesters.pairs():
let epochDuties = item.duties.getOrDefault(epoch)
if not(isDefault(epochDuties)):
yield epochDuties
proc getDelay*(vc: ValidatorClientRef, instant: Duration): Duration =
let currentBeaconTime = vc.beaconClock.now()
let currentTime = Duration(currentBeaconTime)
let slotStartTime = currentBeaconTime.slotOrZero().toBeaconTime()
let idealTime = Duration(slotStartTime) + instant
currentTime - idealTime

View File

@ -0,0 +1,351 @@
import std/[sets, sequtils]
import chronicles
import common, api, block_service
logScope: service = "duties_service"
type
DutiesServiceLoop* = enum
AttesterLoop, ProposerLoop, IndicesLoop
chronicles.formatIt(DutiesServiceLoop):
case it
of AttesterLoop: "attester_loop"
of ProposerLoop: "proposer_loop"
of IndicesLoop: "index_loop"
proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
(uint64(duty.committee_index) <= MAX_COMMITTEES_PER_SLOT) and
(uint64(duty.validator_committee_index) <= duty.committee_length) and
(uint64(duty.validator_index) <= VALIDATOR_REGISTRY_LIMIT)
proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
let stateIdent = StateIdent.init(StateIdentType.Head)
let validatorIdents =
block:
var res: seq[ValidatorIdent]
for validator in vc.attachedValidators.items():
if validator.index.isNone():
res.add(ValidatorIdent.init(validator.pubkey))
res
var validators: seq[RestValidator]
var offset = 0
while offset < len(validatorIdents):
let arraySize = min(MaximumValidatorIds, len(validatorIdents))
let idents =
block:
var res = newSeq[ValidatorIdent](arraySize)
var k = 0
for i in offset ..< arraySize:
res[k] = validatorIdents[i]
inc(k)
res
let res =
try:
await vc.getValidators(idents)
except ValidatorApiError as exc:
error "Unable to retrieve head state's validator information"
return
for item in res:
validators.add(item)
offset += arraySize
for item in validators:
if item.validator.pubkey notin vc.attachedValidators:
warn "Beacon node returned missing validator",
pubKey = item.validator.pubKey, index = item.index
else:
debug "Local validator updated with index",
pubKey = item.validator.pubkey, index = item.index
vc.attachedValidators.updateValidator(item.validator.pubkey,
item.index)
proc pollForAttesterDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[int] {.async.} =
let validatorIndices =
block:
var res: seq[ValidatorIndex]
for index in vc.attachedValidators.indices():
res.add(index)
res
var duties: seq[RestAttesterDuty]
var currentRoot: Option[Eth2Digest]
var offset = 0
while offset < len(validatorIndices):
let arraySize = min(MaximumValidatorIds, len(validatorIndices))
let indices =
block:
var res = newSeq[ValidatorIndex](arraySize)
var k = 0
for i in offset ..< arraySize:
res[k] = validatorIndices[i]
inc(k)
res
let res =
try:
await vc.getAttesterDuties(epoch, indices)
except ValidatorApiError as exc:
error "Unable to retrieve attester duties", epoch = epoch
return 0
if currentRoot.isNone():
# First request
currentRoot = some(res.dependent_root)
else:
if currentRoot.get() != res.dependent_root:
# `dependent_root` must be equal for all requests/response, if it got
# changed it means that some reorg was happened in beacon node and we
# should re-request all queries again.
offset = 0
continue
for item in res.data:
duties.add(item)
offset += arraySize
let
relevantDuties = duties.filterIt(
checkDuty(it) and (it.pubkey in vc.attachedValidators)
)
dependentRoot = currentRoot.get()
fork = vc.fork.get()
genesisRoot = vc.beaconGenesis.genesis_validators_root
let addOrReplaceItems =
block:
var alreadyWarned = false
var res: seq[tuple[epoch: Epoch, duty: RestAttesterDuty]]
for duty in relevantDuties:
let map = vc.attesters.getOrDefault(duty.pubkey)
let epochDuty = map.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(epochDuty.isDefault()):
if epochDuty.dependentRoot != dependentRoot:
res.add((epoch, duty))
if not(alreadyWarned):
warn "Attester duties re-organization",
prior_dependent_root = epochDuty.dependentRoot,
dependent_root = dependentRoot
alreadyWarned = true
else:
info "Received new attester duty", duty, epoch = epoch,
dependent_root = dependentRoot
res.add((epoch, duty))
res
if len(addOrReplaceItems) > 0:
var pending: seq[Future[ValidatorSig]]
for item in addOrReplaceItems:
let validator = vc.attachedValidators.getValidator(item.duty.pubkey)
let future = validator.getSlotSig(fork, genesisRoot, item.duty.slot)
pending.add(future)
await allFutures(pending)
for index, fut in pending.pairs():
let item = addOrReplaceItems[index]
let dap =
if fut.done():
DutyAndProof.init(item.epoch, dependentRoot, item.duty,
some(fut.read()))
else:
DutyAndProof.init(item.epoch, dependentRoot, item.duty,
none[ValidatorSig]())
var validatorDuties = vc.attesters.getOrDefault(item.duty.pubkey)
validatorDuties.duties[item.epoch] = dap
vc.attesters[item.duty.pubkey] = validatorDuties
return len(addOrReplaceItems)
proc pruneAttesterDuties(vc: ValidatorClientRef, epoch: Epoch) =
var attesters: AttesterMap
for key, item in vc.attesters.pairs():
var v = EpochDuties()
for epochKey, epochDuty in item.duties.pairs():
if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch:
v.duties[epochKey] = epochDuty
else:
debug "Attester duties for the epoch has been pruned", validator = key,
epoch = epochKey, loop = AttesterLoop
attesters[key] = v
vc.attesters = attesters
proc pollForAttesterDuties*(vc: ValidatorClientRef) {.async.} =
## Query the beacon node for attestation duties for all known validators.
##
## This function will perform (in the following order):
##
## 1. Poll for current-epoch duties and update the local `attesters` map.
## 2. Poll for next-epoch duties and update the local `attesters` map.
## 3. Push out any attestation subnet subscriptions to the BN.
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
nextEpoch = currentEpoch + 1'u64
if vc.attachedValidators.count() != 0:
var counts: array[2, tuple[epoch: Epoch, count: int]]
counts[0] = (currentEpoch, await vc.pollForAttesterDuties(currentEpoch))
counts[1] = (nextEpoch, await vc.pollForAttesterDuties(nextEpoch))
if (counts[0].count == 0) and (counts[1].count == 0):
debug "No new attester's duties received", slot = currentSlot
let subscriptions =
block:
var res: seq[RestCommitteeSubscription]
for item in counts:
if item.count > 0:
for duty in vc.attesterDutiesForEpoch(item.epoch):
if currentSlot + SUBSCRIPTION_BUFFER_SLOTS < duty.data.slot:
let isAggregator =
if duty.slotSig.isSome():
is_aggregator(duty.data.committee_length,
duty.slotSig.get())
else:
false
let sub = RestCommitteeSubscription(
validator_index: duty.data.validator_index,
committee_index: duty.data.committee_index,
committees_at_slot: duty.data.committees_at_slot,
slot: duty.data.slot,
is_aggregator: isAggregator
)
res.add(sub)
res
if len(subscriptions) > 0:
let res = await vc.prepareBeaconCommitteeSubnet(subscriptions)
if not(res):
error "Failed to subscribe validators"
vc.pruneAttesterDuties(currentEpoch)
proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) =
var proposers: ProposerMap
for epochKey, data in vc.proposers.pairs():
if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch:
proposers[epochKey] = data
else:
debug "Proposer duty has been pruned", epoch = epochKey,
loop = ProposerLoop
vc.proposers = proposers
proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
if vc.attachedValidators.count() != 0:
try:
let res = await vc.getProposerDuties(currentEpoch)
let
dependentRoot = res.dependent_root
duties = res.data
relevantDuties = duties.filterIt(it.pubkey in vc.attachedValidators)
if len(relevantDuties) > 0:
vc.addOrReplaceProposers(currentEpoch, dependentRoot, relevantDuties)
else:
debug "No relevant proposer duties received", slot = currentSlot,
duties_count = len(duties)
except ValidatorApiError as exc:
debug "Unable to retrieve proposer duties", slot = currentSlot,
epoch = currentEpoch
vc.pruneBeaconProposers(currentEpoch)
proc waitForNextSlot(service: DutiesServiceRef,
serviceLoop: DutiesServiceLoop) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextSlot()
await sleepAsync(sleepTime)
proc attesterDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
while true:
await vc.pollForAttesterDuties()
await service.waitForNextSlot(AttesterLoop)
proc proposerDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
while true:
await vc.pollForBeaconProposers()
await service.waitForNextSlot(ProposerLoop)
proc validatorIndexLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
while true:
await vc.pollForValidatorIndices()
await service.waitForNextSlot(IndicesLoop)
template checkAndRestart(serviceLoop: DutiesServiceLoop,
future: Future[void], body: untyped): untyped =
if future.finished():
if future.failed():
let error = future.readError()
debug "The loop ended unexpectedly with an error",
error_name = error.name, error_msg = error.msg, loop = serviceLoop
elif future.cancelled():
debug "The loop is interrupted unexpectedly", loop = serviceLoop
else:
debug "The loop is finished unexpectedly without an error",
loop = serviceLoop
future = body
proc mainLoop(service: DutiesServiceRef) {.async.} =
service.state = ServiceState.Running
let vc = service.client
try:
var
fut1 = service.attesterDutiesLoop()
fut2 = service.proposerDutiesLoop()
fut3 = service.validatorIndexLoop()
while true:
var breakLoop = false
try:
discard await race(fut1, fut2, fut3)
except CancelledError:
if not(fut1.finished()): fut1.cancel()
if not(fut2.finished()): fut2.cancel()
if not(fut3.finished()): fut3.cancel()
await allFutures(fut1, fut2, fut3)
breakLoop = true
if breakLoop:
break
checkAndRestart(AttesterLoop, fut1, service.attesterDutiesLoop())
checkAndRestart(ProposerLoop, fut2, service.proposerDutiesLoop())
checkAndRestart(IndicesLoop, fut3, service.validatorIndexLoop())
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
proc init*(t: typedesc[DutiesServiceRef],
vc: ValidatorClientRef): Future[DutiesServiceRef] {.async.} =
var res = DutiesServiceRef(client: vc, state: ServiceState.Initialized)
debug "Initializing service"
# We query for indices first, to avoid empty queries for duties.
await vc.pollForValidatorIndices()
return res
proc start*(service: DutiesServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -0,0 +1,57 @@
import common, api
logScope: service = "fallback_service"
proc checkNodes*(service: FallbackServiceRef) {.async.} =
let nodesToCheck =
block:
var res: seq[BeaconNodeServerRef]
for item in service.client.beaconNodes:
if item.status != RestBeaconNodeStatus.Online:
res.add(item)
res
let pendingChecks =
block:
var res: seq[Future[void]]
for item in nodesToCheck:
res.add(service.client.checkNode(item))
res
try:
await allFutures(pendingChecks)
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
for fut in pendingChecks:
if not(fut.finished()):
pendingCancel.add(fut.cancelAndWait())
await allFutures(pendingCancel)
raise exc
proc mainLoop(service: FallbackServiceRef) {.async.} =
service.state = ServiceState.Running
try:
while true:
await service.checkNodes()
# Calculating time we need to sleep until
# `time(next_slot) - SLOT_LOOKAHEAD`
let waitTime =
block:
let nextTime = service.client.beaconClock.durationToNextSlot()
if nextTime < SLOT_LOOKAHEAD:
nextTime + seconds(int64(SECONDS_PER_SLOT))
else:
nextTime - SLOT_LOOKAHEAD
await sleepAsync(waitTime)
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
proc init*(t: typedesc[FallbackServiceRef],
vc: ValidatorClientRef): Future[FallbackServiceRef] {.async.} =
debug "Initializing service"
var res = FallbackServiceRef(client: vc, state: ServiceState.Initialized)
# Perform initial nodes check.
await res.checkNodes()
return res
proc start*(service: FallbackServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -0,0 +1,44 @@
import common, api
import chronicles
logScope: service = "fork_service"
proc pollForFork(vc: ValidatorClientRef) {.async.} =
let fork =
try:
await vc.getHeadStateFork()
except ValidatorApiError as exc:
error "Unable to retrieve head state's fork", reason = exc.msg
return
if vc.fork.isNone() or vc.fork.get() != fork:
vc.fork = some(fork)
notice "Fork update success", fork = fork
proc waitForNextEpoch(service: ForkServiceRef) {.async.} =
let vc = service.client
let sleepTime = vc.beaconClock.durationToNextEpoch() + TIME_DELAY_FROM_SLOT
debug "Sleeping until next epoch", sleep_time = sleepTime
await sleepAsync(sleepTime)
proc mainLoop(service: ForkServiceRef) {.async.} =
service.state = ServiceState.Running
let vc = service.client
debug "Service started"
try:
while true:
await vc.pollForFork()
await service.waitForNextEpoch()
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
proc init*(t: typedesc[ForkServiceRef],
vc: ValidatorClientRef): Future[ForkServiceRef] {.async.} =
debug "Initializing service"
var res = ForkServiceRef(client: vc, state: ServiceState.Initialized)
await vc.pollForFork()
return res
proc start*(service: ForkServiceRef) =
service.lifeFut = mainLoop(service)

View File

@ -67,17 +67,12 @@ proc findValidator(validators: auto, pubKey: ValidatorPubKey):
some(idx.ValidatorIndex)
proc addLocalValidator(node: BeaconNode,
validators: openArray[Validator],
privKey: ValidatorPrivKey) =
let pubKey = privKey.toPubKey()
node.attachedValidators[].addLocalValidator(
pubKey, privKey,
findValidator(validators, pubKey.toPubKey()))
node.attachedValidators[].addLocalValidator(privKey)
proc addLocalValidators*(node: BeaconNode) =
for validatorKey in node.config.validatorKeys:
node.addLocalValidator(
getStateField(node.dag.headState.data, validators).asSeq, validatorKey)
node.addLocalValidator(validatorKey)
proc addRemoteValidators*(node: BeaconNode) {.raises: [Defect, OSError, IOError].} =
# load all the validators from the child process - loop until `end`
@ -90,14 +85,14 @@ proc addRemoteValidators*(node: BeaconNode) {.raises: [Defect, OSError, IOError]
getStateField(node.dag.headState.data, validators).asSeq, key)
pk = key.load()
if pk.isSome():
let v = AttachedValidator(pubKey: pk.get(),
let v = AttachedValidator(pubKey: key,
index: index,
kind: ValidatorKind.remote,
connection: ValidatorConnection(
inStream: node.vcProcess.inputStream,
outStream: node.vcProcess.outputStream,
pubKeyStr: $key))
node.attachedValidators[].addRemoteValidator(pk.get(), v)
node.attachedValidators[].addRemoteValidator(key, v)
else:
warn "Could not load public key", line
@ -239,7 +234,8 @@ proc createAndSendAttestation(node: BeaconNode,
return
if node.config.dumpEnabled:
dump(node.config.dumpDirOutgoing, attestation.data, validator.pubKey.toPubKey())
dump(node.config.dumpDirOutgoing, attestation.data,
validator.pubKey)
let wallTime = node.beaconClock.now()
let deadline = attestationData.slot.toBeaconTime() +
@ -415,7 +411,7 @@ proc proposeBlock(node: BeaconNode,
fork, genesis_validators_root, slot, newBlock.root)
let notSlashable = node.attachedValidators
.slashingProtection
.registerBlock(validator_index, validator.pubkey.toPubKey(), slot, signing_root)
.registerBlock(validator_index, validator.pubkey, slot, signing_root)
if notSlashable.isErr:
warn "Slashing protection activated",
@ -487,7 +483,7 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
.slashingProtection
.registerAttestation(
validator_index,
validator.pubkey.toPubKey(),
validator.pubkey,
data.source.epoch,
data.target.epoch,
signing_root)

View File

@ -26,40 +26,72 @@ func init*(T: type ValidatorPool,
## `genesis_validators_root` is used as an unique ID for the
## blockchain
## `backend` is the KeyValue Store backend
T(
slashingProtection: slashingProtectionDB
)
T(slashingProtection: slashingProtectionDB)
template count*(pool: ValidatorPool): int =
pool.validators.len
len(pool.validators)
proc addLocalValidator*(pool: var ValidatorPool,
pubKey: CookedPubKey,
privKey: ValidatorPrivKey,
index: Option[ValidatorIndex]) =
let v = AttachedValidator(pubKey: pubKey,
index: index,
kind: inProcess,
let pubKey = privKey.toPubKey().toPubKey()
let v = AttachedValidator(kind: inProcess, pubKey: pubKey, index: index,
privKey: privKey)
pool.validators[pubKey.toPubKey()] = v
pool.validators[pubKey] = v
notice "Local validator attached", pubKey, validator = shortLog(v)
validators.set(pool.count().int64)
proc addRemoteValidator*(pool: var ValidatorPool,
pubKey: CookedPubKey,
v: AttachedValidator) =
pool.validators[pubKey.toPubKey()] = v
notice "Remote validator attached", pubKey, validator = shortLog(v)
proc addLocalValidator*(pool: var ValidatorPool, privKey: ValidatorPrivKey) =
let pubKey = privKey.toPubKey().toPubKey()
let v = AttachedValidator(kind: inProcess, pubKey: pubKey, privKey: privKey)
pool.validators[pubKey] = v
notice "Local validator attached", pubKey, validator = shortLog(v)
validators.set(pool.count().int64)
proc addRemoteValidator*(pool: var ValidatorPool, pubKey: ValidatorPubKey,
v: AttachedValidator) =
pool.validators[pubKey] = v
notice "Remote validator attached", pubKey, validator = shortLog(v)
validators.set(pool.count().int64)
proc getValidator*(pool: ValidatorPool,
validatorKey: ValidatorPubKey): AttachedValidator =
pool.validators.getOrDefault(validatorKey)
proc signWithRemoteValidator(v: AttachedValidator, data: Eth2Digest):
Future[ValidatorSig] {.async.} =
proc contains*(pool: ValidatorPool, pubKey: ValidatorPubKey): bool =
## Returns ``true`` if validator with key ``pubKey`` present in ``pool``.
pool.validators.contains(pubKey)
proc removeValidator*(pool: var ValidatorPool, pubKey: ValidatorPubKey) =
## Delete validator with public key ``pubKey`` from ``pool``.
pool.validators.del(pubKey)
proc updateValidator*(pool: var ValidatorPool, pubKey: ValidatorPubKey,
index: ValidatorIndex) =
## Set validator ``index`` to validator with public key ``pubKey`` stored
## in ``pool``.
## This procedure will not raise if validator with public key ``pubKey`` is
## not present in the pool.
var v: AttachedValidator
if pool.validators.pop(pubKey, v):
v.index = some(index)
pool.validators[pubKey] = v
iterator publicKeys*(pool: ValidatorPool): ValidatorPubKey =
for item in pool.validators.keys():
yield item
iterator indices*(pool: ValidatorPool): ValidatorIndex =
for item in pool.validators.values():
if item.index.isSome():
yield item.index.get()
iterator items*(pool: ValidatorPool): AttachedValidator =
for item in pool.validators.values():
yield item
proc signWithRemoteValidator(v: AttachedValidator,
data: Eth2Digest): Future[ValidatorSig] {.async.} =
v.connection.inStream.writeLine(v.connection.pubKeyStr, " ", $data)
v.connection.inStream.flush()
var line = newStringOfCap(120).TaintedString
@ -71,70 +103,81 @@ proc signBlockProposal*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot,
blockRoot: Eth2Digest): Future[ValidatorSig] {.async.} =
return if v.kind == inProcess:
get_block_signature(
fork, genesis_validators_root, slot, blockRoot, v.privKey).toValidatorSig()
get_block_signature(fork, genesis_validators_root, slot, blockRoot,
v.privKey).toValidatorSig()
else:
let root = compute_block_root(fork, genesis_validators_root, slot, blockRoot)
let root = compute_block_root(fork, genesis_validators_root, slot,
blockRoot)
await signWithRemoteValidator(v, root)
proc signAttestation*(v: AttachedValidator,
data: AttestationData,
fork: Fork, genesis_validators_root: Eth2Digest):
Future[ValidatorSig] {.async.} =
return if v.kind == inProcess:
get_attestation_signature(
fork, genesis_validators_root, data, v.privKey).toValidatorSig()
else:
let root = compute_attestation_root(fork, genesis_validators_root, data)
await signWithRemoteValidator(v, root)
return
if v.kind == inProcess:
get_attestation_signature(fork, genesis_validators_root, data,
v.privKey).toValidatorSig()
else:
let root = compute_attestation_root(fork, genesis_validators_root, data)
await signWithRemoteValidator(v, root)
proc produceAndSignAttestation*(validator: AttachedValidator,
attestationData: AttestationData,
committeeLen: int, indexInCommittee: Natural,
fork: Fork, genesis_validators_root: Eth2Digest):
fork: Fork,
genesis_validators_root: Eth2Digest):
Future[Attestation] {.async.} =
let validatorSignature = await validator.signAttestation(attestationData,
fork, genesis_validators_root)
let validatorSignature =
await validator.signAttestation(attestationData, fork,
genesis_validators_root)
var aggregationBits = CommitteeValidatorsBits.init(committeeLen)
aggregationBits.setBit indexInCommittee
return Attestation(data: attestationData, signature: validatorSignature, aggregation_bits: aggregationBits)
return Attestation(data: attestationData, signature: validatorSignature,
aggregation_bits: aggregationBits)
proc signAggregateAndProof*(v: AttachedValidator,
aggregate_and_proof: AggregateAndProof,
fork: Fork, genesis_validators_root: Eth2Digest):
Future[ValidatorSig] {.async.} =
return if v.kind == inProcess:
get_aggregate_and_proof_signature(
fork, genesis_validators_root, aggregate_and_proof, v.privKey).toValidatorSig()
else:
let root = compute_aggregate_and_proof_root(
fork, genesis_validators_root, aggregate_and_proof)
await signWithRemoteValidator(v, root)
return
if v.kind == inProcess:
get_aggregate_and_proof_signature(fork, genesis_validators_root,
aggregate_and_proof,
v.privKey).toValidatorSig()
else:
let root = compute_aggregate_and_proof_root(fork, genesis_validators_root,
aggregate_and_proof)
await signWithRemoteValidator(v, root)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#randao-reveal
func genRandaoReveal*(k: ValidatorPrivKey, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot): CookedSig =
get_epoch_signature(
fork, genesis_validators_root, slot.compute_epoch_at_slot, k)
genesis_validators_root: Eth2Digest,
slot: Slot): CookedSig =
get_epoch_signature(fork, genesis_validators_root,
slot.compute_epoch_at_slot, k)
proc genRandaoReveal*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot):
Future[ValidatorSig] {.async.} =
return if v.kind == inProcess:
genRandaoReveal(v.privKey, fork, genesis_validators_root, slot).toValidatorSig()
else:
let root = compute_epoch_root(
fork, genesis_validators_root, slot.compute_epoch_at_slot)
await signWithRemoteValidator(v, root)
return
if v.kind == inProcess:
genRandaoReveal(v.privKey, fork, genesis_validators_root,
slot).toValidatorSig()
else:
let root = compute_epoch_root(fork, genesis_validators_root,
slot.compute_epoch_at_slot)
await signWithRemoteValidator(v, root)
proc getSlotSig*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot
): Future[ValidatorSig] {.async.} =
return if v.kind == inProcess:
get_slot_signature(
fork, genesis_validators_root, slot, v.privKey).toValidatorSig()
else:
let root = compute_slot_root(fork, genesis_validators_root, slot)
await signWithRemoteValidator(v, root)
return
if v.kind == inProcess:
get_slot_signature(fork, genesis_validators_root, slot,
v.privKey).toValidatorSig()
else:
let root = compute_slot_root(fork, genesis_validators_root, slot)
await signWithRemoteValidator(v, root)

View File

@ -9,7 +9,7 @@
if [[ $OS = "Windows_NT" ]]; then
# Copy file.
cp -a $1 $2
cp -a ${1} ${2};
if [ -f "$1" ]; then
DST_FILE=$2
if [ -d "$2" ]; then
@ -17,18 +17,18 @@ if [[ $OS = "Windows_NT" ]]; then
DST_FILE=$(realpath ${2})/$SRC_NAME
fi
# Single file was copied, so we setting file permissions only.
icacls $DST_FILE /inheritance:r /grant:r $USERDOMAIN\\$USERNAME:\(F\);
icacls ${DST_FILE} /inheritance:r /grant:r $USERDOMAIN\\$USERNAME:\(F\);
else
if [ -d "$1" ]; then
SRC_NAME="$(basename -- $1)"
DST_DIR=$(realpath ${2})/$SRC_NAME
DST_FILES=$(realpath ${DST_DIR})/\*
# Directory was copied, so we update destination directory permissions.
icacls $DST_DIR /inheritance:r /grant:r $USERDOMAIN\\$USERNAME:\(OI\)\(CI\)\(F\);
icacls ${DST_DIR} /inheritance:r /grant:r $USERDOMAIN\\$USERNAME:\(OI\)\(CI\)\(F\);
# And update permissions for all files inside of destination directory.
icacls $DST_FILES /inheritance:r /grant:r $USERDOMAIN\\$USERNAME:\(F\);
icacls ${DST_FILES} /inheritance:r /grant:r $USERDOMAIN\\$USERNAME:\(F\);
fi
fi
else
cp -a $1 $2;
cp -a ${1} ${2};
fi

View File

@ -167,15 +167,15 @@ if [[ "$REUSE_EXISTING_DATA_DIR" == "0" ]]; then
rm -rf "${DATA_DIR}"
fi
mkdir -m 0700 -p "${DATA_DIR}"
scripts/makedir.sh "${DATA_DIR}"
DEPOSITS_FILE="${DATA_DIR}/deposits.json"
VALIDATORS_DIR="${DATA_DIR}/validators"
mkdir -p "${VALIDATORS_DIR}"
scripts/makedir.sh "${VALIDATORS_DIR}"
SECRETS_DIR="${DATA_DIR}/secrets"
mkdir -p "${SECRETS_DIR}"
scripts/makedir.sh "${SECRETS_DIR}"
NETWORK_DIR="${DATA_DIR}/network_dir"
mkdir -p "${NETWORK_DIR}"
@ -338,6 +338,43 @@ BOOTSTRAP_ENR="${DATA_DIR}/node${BOOTSTRAP_NODE}/beacon_node.enr"
NETWORK_KEYFILE="../network_key.json"
for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
# Copy validators to individual nodes.
# The first $NODES_WITH_VALIDATORS nodes split them equally between them,
# after skipping the first $USER_VALIDATORS.
NODE_DATA_DIR="${DATA_DIR}/node${NUM_NODE}"
rm -rf "${NODE_DATA_DIR}"
scripts/makedir.sh "${NODE_DATA_DIR}" 2>&1
scripts/makedir.sh "${NODE_DATA_DIR}/validators" 2>&1
scripts/makedir.sh "${NODE_DATA_DIR}/secrets" 2>&1
if [[ $NUM_NODE -lt $NODES_WITH_VALIDATORS ]]; then
if [ "${USE_VC:-}" == "1" ]; then
VALIDATOR_DATA_DIR="${DATA_DIR}/validator${NUM_NODE}"
rm -rf "${VALIDATOR_DATA_DIR}"
scripts/makedir.sh "${VALIDATOR_DATA_DIR}" 2>&1
scripts/makedir.sh "${VALIDATOR_DATA_DIR}/validators" 2>&1
scripts/makedir.sh "${VALIDATOR_DATA_DIR}/secrets" 2>&1
for VALIDATOR in $(ls "${VALIDATORS_DIR}" | tail -n +$(( $USER_VALIDATORS + ($VALIDATORS_PER_VALIDATOR * $NUM_NODE) + 1 + $VALIDATOR_OFFSET )) | head -n $VALIDATORS_PER_VALIDATOR); do
cp -a "${VALIDATORS_DIR}/${VALIDATOR}" "${VALIDATOR_DATA_DIR}/validators/" 2>&1
cp -a "${SECRETS_DIR}/${VALIDATOR}" "${VALIDATOR_DATA_DIR}/secrets/" 2>&1
done
if [[ $OS = "Windows_NT" ]]; then
find "${VALIDATOR_DATA_DIR}" -type f \( -iname "*.json" -o ! -iname "*.*" \) -exec icacls "{}" /inheritance:r /grant:r ${USERDOMAIN}\\${USERNAME}:\(F\) \;
fi
fi
for VALIDATOR in $(ls "${VALIDATORS_DIR}" | tail -n +$(( $USER_VALIDATORS + ($VALIDATORS_PER_NODE * $NUM_NODE) + 1 )) | head -n $VALIDATORS_PER_NODE); do
cp -a "${VALIDATORS_DIR}/${VALIDATOR}" "${NODE_DATA_DIR}/validators/" 2>&1
cp -a "${SECRETS_DIR}/${VALIDATOR}" "${NODE_DATA_DIR}/secrets/" 2>&1
done
if [[ $OS = "Windows_NT" ]]; then
find "${NODE_DATA_DIR}" -type f \( -iname "*.json" -o ! -iname "*.*" \) -exec icacls "{}" /inheritance:r /grant:r ${USERDOMAIN}\\${USERNAME}:\(F\) \;
fi
fi
done
for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
NODE_DATA_DIR="${DATA_DIR}/node${NUM_NODE}"
VALIDATOR_DATA_DIR="${DATA_DIR}/validator${NUM_NODE}"
if [[ ${NUM_NODE} == ${BOOTSTRAP_NODE} ]]; then
BOOTSTRAP_ARG="--netkey-file=${NETWORK_KEYFILE} --insecure-netkey-password=true"
else
@ -355,33 +392,6 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
done
fi
# Copy validators to individual nodes.
# The first $NODES_WITH_VALIDATORS nodes split them equally between them, after skipping the first $USER_VALIDATORS.
NODE_DATA_DIR="${DATA_DIR}/node${NUM_NODE}"
rm -rf "${NODE_DATA_DIR}"
mkdir -m 0700 -p "${NODE_DATA_DIR}"
mkdir -p "${NODE_DATA_DIR}/validators"
mkdir -p "${NODE_DATA_DIR}/secrets"
if [[ $NUM_NODE -lt $NODES_WITH_VALIDATORS ]]; then
if [ "${USE_VC:-}" == "1" ]; then
VALIDATOR_DATA_DIR="${DATA_DIR}/validator${NUM_NODE}"
rm -rf "${VALIDATOR_DATA_DIR}"
mkdir -p "${VALIDATOR_DATA_DIR}/validators"
mkdir -p "${VALIDATOR_DATA_DIR}/secrets"
for VALIDATOR in $(ls "${VALIDATORS_DIR}" | tail -n +$(( $USER_VALIDATORS + ($VALIDATORS_PER_VALIDATOR * $NUM_NODE) + 1 + $VALIDATOR_OFFSET )) | head -n $VALIDATORS_PER_VALIDATOR); do
cp -a "${VALIDATORS_DIR}/$VALIDATOR" "${VALIDATOR_DATA_DIR}/validators/"
cp -a "${SECRETS_DIR}/${VALIDATOR}" "${VALIDATOR_DATA_DIR}/secrets/"
done
fi
for VALIDATOR in $(ls "${VALIDATORS_DIR}" | tail -n +$(( $USER_VALIDATORS + ($VALIDATORS_PER_NODE * $NUM_NODE) + 1 )) | head -n $VALIDATORS_PER_NODE); do
cp -a "${VALIDATORS_DIR}/$VALIDATOR" "${NODE_DATA_DIR}/validators/"
cp -a "${SECRETS_DIR}/${VALIDATOR}" "${NODE_DATA_DIR}/secrets/"
done
fi
./build/nimbus_beacon_node \
--non-interactive \
--nat:extip:127.0.0.1 \
@ -394,9 +404,9 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
${BOOTSTRAP_ARG} \
${WEB3_ARG} \
${STOP_AT_EPOCH_FLAG} \
--rpc \
--rpc-address="127.0.0.1" \
--rpc-port="$(( BASE_RPC_PORT + NUM_NODE ))" \
--rest \
--rest-address="127.0.0.1" \
--rest-port="$(( BASE_RPC_PORT + NUM_NODE ))" \
--metrics \
--metrics-address="127.0.0.1" \
--metrics-port="$(( BASE_METRICS_PORT + NUM_NODE ))" \
@ -415,7 +425,7 @@ for NUM_NODE in $(seq 0 $(( NUM_NODES - 1 ))); do
--log-level="${LOG_LEVEL}" \
${STOP_AT_EPOCH_FLAG} \
--data-dir="${VALIDATOR_DATA_DIR}" \
--rpc-port="$(( BASE_RPC_PORT + NUM_NODE ))" \
--beacon-node="http://127.0.0.1:$((BASE_RPC_PORT + NUM_NODE))" \
> "${DATA_DIR}/log_val${NUM_NODE}.txt" 2>&1 &
fi
done
@ -456,7 +466,11 @@ else
dump_logs
dump_logtrace
if [[ "${TIMEOUT_DURATION}" != "0" ]]; then
pkill -HUP -P ${WATCHER_PID}
if uname | grep -qiE "mingw|msys"; then
taskkill //F //PID ${WATCHER_PID}
else
pkill -HUP -P ${WATCHER_PID}
fi
fi
exit 1
fi
@ -465,6 +479,9 @@ fi
dump_logtrace
if [[ "${TIMEOUT_DURATION}" != "0" ]]; then
pkill -HUP -P ${WATCHER_PID}
if uname | grep -qiE "mingw|msys"; then
taskkill //F //PID ${WATCHER_PID}
else
pkill -HUP -P ${WATCHER_PID}
fi
fi

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 7ccb170f7a0d02d724af03b4fce9c270c41125cb
Subproject commit 15137f71c303dc88bc4acd7db3b9ca35b6f3681e

@ -1 +1 @@
Subproject commit 8b492c74b56c62bcee991a6899d413938a3accc5
Subproject commit 9a56559ae3ce7e81b75ae150c1030adf991bf39c

2
vendor/nim-presto vendored

@ -1 +1 @@
Subproject commit 5163805723deeec11a25004422e9b17922aaa855
Subproject commit 63fcc3df780237b12478931db74b89fe2612d459