parent
0cfc1b776e
commit
00f083785d
|
@ -905,6 +905,11 @@ type
|
|||
defaultValue: 0
|
||||
name: "stop-at-epoch" .}: uint64
|
||||
|
||||
payloadBuilderEnable* {.
|
||||
desc: "Enable usage of beacon node with external payload builder"
|
||||
defaultValue: false
|
||||
name: "payload-builder" .}: bool
|
||||
|
||||
beaconNodes* {.
|
||||
desc: "URL addresses to one or more beacon node HTTP REST APIs",
|
||||
defaultValue: @[defaultBeaconNodeUri]
|
||||
|
|
|
@ -87,3 +87,8 @@ proc prepareBeaconProposer*(body: seq[PrepareBeaconProposer]): RestPlainResponse
|
|||
rest, endpoint: "/eth/v1/validator/prepare_beacon_proposer",
|
||||
meth: MethodPost.}
|
||||
## https://ethereum.github.io/beacon-APIs/#/ValidatorRequiredApi/prepareBeaconProposer
|
||||
|
||||
proc registerValidator*(body: seq[SignedValidatorRegistrationV1]): RestPlainResponse {.
|
||||
rest, endpoint: "/eth/v1/validator/register_validator",
|
||||
meth: MethodPost.}
|
||||
## https://ethereum.github.io/beacon-APIs/#/Validator/registerValidator
|
||||
|
|
|
@ -2087,3 +2087,43 @@ proc prepareBeaconProposer*(
|
|||
debug "Beacon proposer preparation failed", status = response.status,
|
||||
endpoint = apiResponse.node
|
||||
return count
|
||||
|
||||
proc registerValidator*(
|
||||
vc: ValidatorClientRef,
|
||||
data: seq[SignedValidatorRegistrationV1]
|
||||
): Future[int] {.async.} =
|
||||
logScope: request = "registerValidators"
|
||||
let resp = vc.onceToAll(RestPlainResponse, SlotDuration,
|
||||
{BeaconNodeRole.BlockProposalPublish},
|
||||
registerValidator(it, data))
|
||||
if len(resp.data) == 0:
|
||||
# We did not get any response from beacon nodes.
|
||||
case resp.status
|
||||
of ApiOperation.Success:
|
||||
# This should not be happened, there should be present at least one
|
||||
# successfull response.
|
||||
return 0
|
||||
of ApiOperation.Timeout:
|
||||
debug "Unable to register validators in time",
|
||||
timeout = SlotDuration
|
||||
return 0
|
||||
of ApiOperation.Interrupt:
|
||||
debug "Validator registration was interrupted"
|
||||
return 00
|
||||
of ApiOperation.Failure:
|
||||
debug "Unexpected error happened while registering validators"
|
||||
return 0
|
||||
else:
|
||||
var count = 0
|
||||
for apiResponse in resp.data:
|
||||
if apiResponse.data.isErr():
|
||||
debug "Unable to register validator with beacon node",
|
||||
endpoint = apiResponse.node, error = apiResponse.data.error()
|
||||
else:
|
||||
let response = apiResponse.data.get()
|
||||
if response.status == 200:
|
||||
inc(count)
|
||||
else:
|
||||
debug "Unable to register validators with beacon node",
|
||||
status = response.status, endpoint = apiResponse.node
|
||||
return count
|
||||
|
|
|
@ -18,13 +18,15 @@ import
|
|||
".."/validators/[keystore_management, validator_pool, slashing_protection],
|
||||
".."/[conf, beacon_clock, version, nimbus_binary_common]
|
||||
|
||||
from std/times import Time, toUnix, fromUnix, getTime
|
||||
|
||||
export
|
||||
os, sets, sequtils, chronos, presto, chronicles, confutils,
|
||||
nimbus_binary_common, version, conf, options, tables, results, base10,
|
||||
byteutils, presto_client, eth2_rest_serialization, rest_beacon_client,
|
||||
phase0, altair, helpers, signatures, validator, eth2_merkleization,
|
||||
beacon_clock, keystore_management, slashing_protection, validator_pool,
|
||||
dynamic_fee_recipients
|
||||
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime
|
||||
|
||||
const
|
||||
SYNC_TOLERANCE* = 4'u64
|
||||
|
@ -32,6 +34,8 @@ const
|
|||
HISTORICAL_DUTIES_EPOCHS* = 2'u64
|
||||
TIME_DELAY_FROM_SLOT* = 79.milliseconds
|
||||
SUBSCRIPTION_BUFFER_SLOTS* = 2'u64
|
||||
VALIDATOR_DEFAULT_GAS_LIMIT* = 30_000_000'u64 # Stand-in, reasonable default
|
||||
EPOCHS_BETWEEN_VALIDATOR_REGISTRATION* = 1
|
||||
|
||||
DelayBuckets* = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
|
||||
0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf]
|
||||
|
@ -44,6 +48,13 @@ type
|
|||
slot*: Slot
|
||||
proposers*: seq[ValidatorPubKey]
|
||||
|
||||
RegistrationKind* {.pure.} = enum
|
||||
Cached, IncorrectTime, MissingIndex, MissingFee, ErrorSignature, NoSignature
|
||||
|
||||
PendingValidatorRegistration* = object
|
||||
registration*: SignedValidatorRegistrationV1
|
||||
future*: Future[SignatureResult]
|
||||
|
||||
ClientServiceRef* = ref object of RootObj
|
||||
name*: string
|
||||
state*: ServiceState
|
||||
|
@ -176,6 +187,7 @@ type
|
|||
beaconGenesis*: RestGenesis
|
||||
proposerTasks*: Table[Slot, seq[ProposerTask]]
|
||||
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
|
||||
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
|
||||
rng*: ref HmacDrbgContext
|
||||
|
||||
ValidatorClientRef* = ref ValidatorClient
|
||||
|
@ -736,3 +748,160 @@ proc prepareProposersList*(vc: ValidatorClientRef,
|
|||
res.add(PrepareBeaconProposer(validator_index: index,
|
||||
fee_recipient: feeRecipient.get()))
|
||||
res
|
||||
|
||||
proc isDefault*(reg: SignedValidatorRegistrationV1): bool =
|
||||
(reg.message.timestamp == 0'u64) or (reg.message.gas_limit == 0'u64)
|
||||
|
||||
proc isExpired*(vc: ValidatorClientRef,
|
||||
reg: SignedValidatorRegistrationV1, slot: Slot): bool =
|
||||
let
|
||||
regTime = fromUnix(int64(reg.message.timestamp))
|
||||
regSlot =
|
||||
block:
|
||||
let res = vc.beaconClock.toSlot(regTime)
|
||||
if not(res.afterGenesis):
|
||||
# This case should not be happend, but it could in case of time jumps
|
||||
# (time could be modified by admin or ntpd).
|
||||
return false
|
||||
uint64(res.slot)
|
||||
|
||||
if regSlot > slot:
|
||||
# This case should not be happened, but if it happens (time could be
|
||||
# modified by admin or ntpd).
|
||||
false
|
||||
else:
|
||||
if (slot - regSlot) div SLOTS_PER_EPOCH >=
|
||||
EPOCHS_BETWEEN_VALIDATOR_REGISTRATION:
|
||||
false
|
||||
else:
|
||||
true
|
||||
|
||||
proc getValidatorRegistraion(
|
||||
vc: ValidatorClientRef,
|
||||
validator: AttachedValidator,
|
||||
timestamp: Time,
|
||||
fork: Fork
|
||||
): Result[PendingValidatorRegistration, RegistrationKind] =
|
||||
if validator.index.isNone():
|
||||
debug "Validator registration missing validator index",
|
||||
validator = shortLog(validator)
|
||||
return err(RegistrationKind.MissingIndex)
|
||||
|
||||
let
|
||||
vindex = validator.index.get()
|
||||
cached = vc.validatorsRegCache.getOrDefault(validator.pubkey)
|
||||
currentSlot =
|
||||
block:
|
||||
let res = vc.beaconClock.toSlot(timestamp)
|
||||
if not(res.afterGenesis):
|
||||
return err(RegistrationKind.IncorrectTime)
|
||||
res.slot
|
||||
|
||||
if cached.isDefault() or vc.isExpired(cached, currentSlot):
|
||||
let feeRecipient = vc.getFeeRecipient(validator.pubkey, vindex,
|
||||
currentSlot.epoch())
|
||||
if feeRecipient.isNone():
|
||||
debug "Could not get fee recipient for registration data",
|
||||
validator = shortLog(validator)
|
||||
return err(RegistrationKind.MissingFee)
|
||||
|
||||
var registration =
|
||||
SignedValidatorRegistrationV1(
|
||||
message: ValidatorRegistrationV1(
|
||||
fee_recipient:
|
||||
ExecutionAddress(data: distinctBase(feeRecipient.get())),
|
||||
gas_limit: VALIDATOR_DEFAULT_GAS_LIMIT,
|
||||
timestamp: uint64(timestamp.toUnix()),
|
||||
pubkey: validator.pubkey
|
||||
)
|
||||
)
|
||||
|
||||
let sigfut = validator.getBuilderSignature(fork, registration.message)
|
||||
if sigfut.finished():
|
||||
# This is short-path if we able to create signature locally.
|
||||
if not(sigfut.done()):
|
||||
let exc = sigfut.readError()
|
||||
debug "Got unexpected exception while signing validator registration",
|
||||
validator = shortLog(validator), error_name = $exc.name,
|
||||
error_msg = $exc.msg
|
||||
return err(RegistrationKind.ErrorSignature)
|
||||
let sigres = sigfut.read()
|
||||
if sigres.isErr():
|
||||
debug "Failed to get signature for validator registration",
|
||||
validator = shortLog(validator), error = sigres.error()
|
||||
return err(RegistrationKind.NoSignature)
|
||||
registration.signature = sigres.get()
|
||||
# Updating cache table with new signed registration data
|
||||
vc.validatorsRegCache[registration.message.pubkey] = registration
|
||||
ok(PendingValidatorRegistration(registration: registration, future: nil))
|
||||
else:
|
||||
# Remote signature service involved, cache will be updated later.
|
||||
ok(PendingValidatorRegistration(registration: registration,
|
||||
future: sigfut))
|
||||
else:
|
||||
# Returning cached result.
|
||||
err(RegistrationKind.Cached)
|
||||
|
||||
proc prepareRegistrationList*(
|
||||
vc: ValidatorClientRef,
|
||||
timestamp: Time,
|
||||
fork: Fork
|
||||
): Future[seq[SignedValidatorRegistrationV1]] {.async.} =
|
||||
|
||||
var
|
||||
messages: seq[SignedValidatorRegistrationV1]
|
||||
futures: seq[Future[SignatureResult]]
|
||||
registrations: seq[SignedValidatorRegistrationV1]
|
||||
total = vc.attachedValidators[].count()
|
||||
succeed = 0
|
||||
bad = 0
|
||||
errors = 0
|
||||
indexMissing = 0
|
||||
feeMissing = 0
|
||||
cached = 0
|
||||
timed = 0
|
||||
|
||||
for validator in vc.attachedValidators[].items():
|
||||
let res = vc.getValidatorRegistraion(validator, timestamp, fork)
|
||||
if res.isOk():
|
||||
let preg = res.get()
|
||||
if preg.future.isNil():
|
||||
registrations.add(preg.registration)
|
||||
else:
|
||||
messages.add(preg.registration)
|
||||
futures.add(preg.future)
|
||||
else:
|
||||
case res.error()
|
||||
of RegistrationKind.Cached: inc(cached)
|
||||
of RegistrationKind.IncorrectTime: inc(timed)
|
||||
of RegistrationKind.NoSignature: inc(bad)
|
||||
of RegistrationKind.ErrorSignature: inc(errors)
|
||||
of RegistrationKind.MissingIndex: inc(indexMissing)
|
||||
of RegistrationKind.MissingFee: inc(feeMissing)
|
||||
|
||||
succeed = len(registrations)
|
||||
|
||||
if len(futures) > 0:
|
||||
await allFutures(futures)
|
||||
|
||||
for index, future in futures.pairs():
|
||||
if future.done():
|
||||
let sres = future.read()
|
||||
if sres.isOk():
|
||||
var reg = messages[index]
|
||||
reg.signature = sres.get()
|
||||
registrations.add(reg)
|
||||
# Updating cache table
|
||||
vc.validatorsRegCache[reg.message.pubkey] = reg
|
||||
inc(succeed)
|
||||
else:
|
||||
inc(bad)
|
||||
else:
|
||||
inc(errors)
|
||||
|
||||
debug "Validator registrations prepared", total = total, succeed = succeed,
|
||||
cached = cached, bad = bad, errors = errors,
|
||||
index_missing = indexMissing, fee_missing = feeMissing,
|
||||
incorrect_time = timed
|
||||
|
||||
return registrations
|
||||
|
|
|
@ -17,7 +17,7 @@ logScope: service = ServiceName
|
|||
type
|
||||
DutiesServiceLoop* = enum
|
||||
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop,
|
||||
ProposerPreparationLoop
|
||||
ProposerPreparationLoop, ValidatorRegisterLoop
|
||||
|
||||
chronicles.formatIt(DutiesServiceLoop):
|
||||
case it
|
||||
|
@ -26,6 +26,7 @@ chronicles.formatIt(DutiesServiceLoop):
|
|||
of IndicesLoop: "index_loop"
|
||||
of SyncCommitteeLoop: "sync_committee_loop"
|
||||
of ProposerPreparationLoop: "proposer_prepare_loop"
|
||||
of ValidatorRegisterLoop: "validator_register_loop"
|
||||
|
||||
proc checkDuty(duty: RestAttesterDuty): bool =
|
||||
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
|
||||
|
@ -551,6 +552,54 @@ proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} =
|
|||
proposers_count = len(proposers),
|
||||
prepared_count = count
|
||||
|
||||
proc registerValidators*(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
let sres = vc.getCurrentSlot()
|
||||
|
||||
var default: seq[SignedValidatorRegistrationV1]
|
||||
if sres.isSome():
|
||||
let
|
||||
genesisFork = vc.forks[0]
|
||||
currentSlot = sres.get()
|
||||
registrations =
|
||||
try:
|
||||
await vc.prepareRegistrationList(getTime(), genesisFork)
|
||||
except CancelledError as exc:
|
||||
debug "Validator registration preparation was interrupted",
|
||||
slot = currentSlot, fork = genesisFork
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
error "Unexpected error occured while preparing validators " &
|
||||
"registration data", slot = currentSlot, fork = genesisFork,
|
||||
err_name = exc.name, err_msg = exc.msg
|
||||
default
|
||||
|
||||
let count =
|
||||
if len(registrations) > 0:
|
||||
try:
|
||||
await registerValidator(vc, registrations)
|
||||
except ValidatorApiError as exc:
|
||||
warn "Unable to register validators", slot = currentSlot,
|
||||
fork = genesisFork, err_name = exc.name,
|
||||
err_msg = exc.msg
|
||||
0
|
||||
except CancelledError as exc:
|
||||
debug "Validator registration was interrupted", slot = currentSlot,
|
||||
fork = genesisFork
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
error "Unexpected error occured while registering validators",
|
||||
slot = currentSlot, fork = genesisFork, err_name = exc.name,
|
||||
err_msg = exc.msg
|
||||
0
|
||||
else:
|
||||
0
|
||||
|
||||
if count > 0:
|
||||
debug "Validators registered", slot = currentSlot,
|
||||
beacon_nodes_count = count, registrations = len(registrations),
|
||||
validators_count = vc.attachedValidators[].count()
|
||||
|
||||
proc waitForNextSlot(service: DutiesServiceRef,
|
||||
serviceLoop: DutiesServiceLoop) {.async.} =
|
||||
let vc = service.client
|
||||
|
@ -585,12 +634,23 @@ proc validatorIndexLoop(service: DutiesServiceRef) {.async.} =
|
|||
|
||||
proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
||||
debug "Beacon proposer preparation loop waiting for validator indices update"
|
||||
await vc.indicesAvailable.wait()
|
||||
while true:
|
||||
await service.prepareBeaconProposers()
|
||||
await service.waitForNextSlot(ProposerPreparationLoop)
|
||||
|
||||
proc validatorRegisterLoop(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
doAssert(vc.config.payloadBuilderEnable)
|
||||
|
||||
debug "Validator registration loop is waiting for initialization"
|
||||
await allFutures(vc.indicesAvailable.wait(), vc.forksAvailable.wait())
|
||||
while true:
|
||||
await service.registerValidators()
|
||||
await service.waitForNextSlot(ValidatorRegisterLoop)
|
||||
|
||||
proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
||||
|
@ -616,6 +676,8 @@ template checkAndRestart(serviceLoop: DutiesServiceLoop,
|
|||
future = body
|
||||
|
||||
proc mainLoop(service: DutiesServiceRef) {.async.} =
|
||||
let vc = service.client
|
||||
|
||||
service.state = ServiceState.Running
|
||||
debug "Service started"
|
||||
|
||||
|
@ -625,6 +687,11 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
|
|||
indicesFut = service.validatorIndexLoop()
|
||||
syncFut = service.syncCommitteeDutiesLoop()
|
||||
prepareFut = service.proposerPreparationsLoop()
|
||||
registerFut =
|
||||
if vc.config.payloadBuilderEnable:
|
||||
service.validatorRegisterLoop()
|
||||
else:
|
||||
nil
|
||||
|
||||
while true:
|
||||
# This loop could look much more nicer/better, when
|
||||
|
@ -632,8 +699,11 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
|
|||
# become safe to combine loops, breaks and exception handlers.
|
||||
let breakLoop =
|
||||
try:
|
||||
discard await race(attestFut, proposeFut, indicesFut, syncFut,
|
||||
prepareFut)
|
||||
var futures = @[FutureBase(attestFut), FutureBase(proposeFut),
|
||||
FutureBase(indicesFut), FutureBase(syncFut),
|
||||
FutureBase(prepareFut)]
|
||||
if not(isNil(registerFut)): futures.add(FutureBase(registerFut))
|
||||
discard await race(futures)
|
||||
checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop())
|
||||
checkAndRestart(ProposerLoop, proposeFut, service.proposerDutiesLoop())
|
||||
checkAndRestart(IndicesLoop, indicesFut, service.validatorIndexLoop())
|
||||
|
@ -641,6 +711,9 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
|
|||
service.syncCommitteeDutiesLoop())
|
||||
checkAndRestart(ProposerPreparationLoop, prepareFut,
|
||||
service.proposerPreparationsLoop())
|
||||
if not(isNil(registerFut)):
|
||||
checkAndRestart(ValidatorRegisterLoop, registerFut,
|
||||
service.validatorRegisterLoop())
|
||||
false
|
||||
except CancelledError:
|
||||
debug "Service interrupted"
|
||||
|
@ -655,6 +728,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
|
|||
pending.add(syncFut.cancelAndWait())
|
||||
if not(prepareFut.finished()):
|
||||
pending.add(prepareFut.cancelAndWait())
|
||||
if not(isNil(registerFut)) and not(registerFut.finished()):
|
||||
pending.add(registerFut.cancelAndWait())
|
||||
await allFutures(pending)
|
||||
true
|
||||
except CatchableError as exc:
|
||||
|
|
Loading…
Reference in New Issue