nimbus-eth2/beacon_chain/validator_client/common.nim

908 lines
32 KiB
Nim

# beacon_chain
# Copyright (c) 2021-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import
std/[tables, os, sets, sequtils, strutils, uri],
stew/[base10, results, byteutils],
bearssl/rand, chronos, presto, presto/client as presto_client,
chronicles, confutils, json_serialization/std/[options, net],
metrics, metrics/chronos_httpserver,
".."/spec/datatypes/[phase0, altair],
".."/spec/[eth2_merkleization, helpers, signatures, validator],
".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client,
dynamic_fee_recipients],
".."/validators/[keystore_management, validator_pool, slashing_protection],
".."/[conf, beacon_clock, version, nimbus_binary_common]
from std/times import Time, toUnix, fromUnix, getTime
export
os, sets, sequtils, chronos, presto, chronicles, confutils,
nimbus_binary_common, version, conf, options, tables, results, base10,
byteutils, presto_client, eth2_rest_serialization, rest_beacon_client,
phase0, altair, helpers, signatures, validator, eth2_merkleization,
beacon_clock, keystore_management, slashing_protection, validator_pool,
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime
const
SYNC_TOLERANCE* = 4'u64
SLOT_LOOKAHEAD* = 1.seconds
HISTORICAL_DUTIES_EPOCHS* = 2'u64
TIME_DELAY_FROM_SLOT* = 79.milliseconds
SUBSCRIPTION_BUFFER_SLOTS* = 2'u64
VALIDATOR_DEFAULT_GAS_LIMIT* = 30_000_000'u64 # Stand-in, reasonable default
EPOCHS_BETWEEN_VALIDATOR_REGISTRATION* = 1
DelayBuckets* = [-Inf, -4.0, -2.0, -1.0, -0.5, -0.1, -0.05,
0.05, 0.1, 0.5, 1.0, 2.0, 4.0, 8.0, Inf]
type
ServiceState* {.pure.} = enum
Initialized, Running, Error, Closing, Closed
BlockServiceEventRef* = ref object of RootObj
slot*: Slot
proposers*: seq[ValidatorPubKey]
RegistrationKind* {.pure.} = enum
Cached, IncorrectTime, MissingIndex, MissingFee, ErrorSignature, NoSignature
PendingValidatorRegistration* = object
registration*: SignedValidatorRegistrationV1
future*: Future[SignatureResult]
ClientServiceRef* = ref object of RootObj
name*: string
state*: ServiceState
lifeFut*: Future[void]
client*: ValidatorClientRef
DutiesServiceRef* = ref object of ClientServiceRef
FallbackServiceRef* = ref object of ClientServiceRef
onlineEvent*: AsyncEvent
ForkServiceRef* = ref object of ClientServiceRef
AttestationServiceRef* = ref object of ClientServiceRef
BlockServiceRef* = ref object of ClientServiceRef
SyncCommitteeServiceRef* = ref object of ClientServiceRef
DoppelgangerServiceRef* = ref object of ClientServiceRef
enabled*: bool
DutyAndProof* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
data*: RestAttesterDuty
slotSig*: Option[ValidatorSig]
SyncCommitteeDuty* = object
pubkey*: ValidatorPubKey
validator_index*: ValidatorIndex
validator_sync_committee_index*: IndexInSyncCommittee
SyncDutyAndProof* = object
epoch*: Epoch
data*: SyncCommitteeDuty
slotSig*: Option[ValidatorSig]
SyncCommitteeSubscriptionInfo* = object
validator_index*: ValidatorIndex
validator_sync_committee_indices*: seq[IndexInSyncCommittee]
ProposerTask* = object
duty*: RestProposerDuty
future*: Future[void]
ProposedData* = object
epoch*: Epoch
dependentRoot*: Eth2Digest
duties*: seq[ProposerTask]
BeaconNodeRole* {.pure.} = enum
Duties,
AttestationData, AttestationPublish,
AggregatedData, AggregatedPublish,
BlockProposalData, BlockProposalPublish,
SyncCommitteeData, SyncCommitteePublish
BeaconNodeServer* = object
client*: RestClientRef
endpoint*: string
config*: Option[RestSpecVC]
ident*: Option[string]
genesis*: Option[RestGenesis]
syncInfo*: Option[RestSyncInfo]
status*: RestBeaconNodeStatus
roles*: set[BeaconNodeRole]
logIdent*: string
index*: int
EpochDuties* = object
duties*: Table[Epoch, DutyAndProof]
EpochSyncDuties* = object
duties*: Table[Epoch, SyncDutyAndProof]
RestBeaconNodeStatus* {.pure.} = enum
Uninitalized, Offline, Incompatible, NotSynced, Online
BeaconNodeServerRef* = ref BeaconNodeServer
AttesterMap* = Table[ValidatorPubKey, EpochDuties]
SyncCommitteeDutiesMap* = Table[ValidatorPubKey, EpochSyncDuties]
ProposerMap* = Table[Epoch, ProposedData]
DoppelgangerStatus* {.pure.} = enum
None, Checking, Passed
DoppelgangerAttempt* {.pure.} = enum
None, Failure, SuccessTrue, SuccessFalse
DoppelgangerState* = object
startEpoch*: Epoch
epochsCount*: uint64
lastAttempt*: DoppelgangerAttempt
status*: DoppelgangerStatus
DoppelgangerDetection* = object
startSlot*: Slot
validators*: Table[ValidatorIndex, DoppelgangerState]
ValidatorClient* = object
config*: ValidatorClientConf
metricsServer*: Option[MetricsHttpServerRef]
graffitiBytes*: GraffitiBytes
beaconNodes*: seq[BeaconNodeServerRef]
fallbackService*: FallbackServiceRef
forkService*: ForkServiceRef
dutiesService*: DutiesServiceRef
attestationService*: AttestationServiceRef
blockService*: BlockServiceRef
syncCommitteeService*: SyncCommitteeServiceRef
doppelgangerService*: DoppelgangerServiceRef
runSlotLoopFut*: Future[void]
sigintHandleFut*: Future[void]
sigtermHandleFut*: Future[void]
keymanagerHost*: ref KeymanagerHost
keymanagerServer*: RestServerRef
beaconClock*: BeaconClock
doppelgangerDetection*: DoppelgangerDetection
attachedValidators*: ref ValidatorPool
forks*: seq[Fork]
forksAvailable*: AsyncEvent
nodesAvailable*: AsyncEvent
indicesAvailable*: AsyncEvent
gracefulExit*: AsyncEvent
attesters*: AttesterMap
proposers*: ProposerMap
syncCommitteeDuties*: SyncCommitteeDutiesMap
beaconGenesis*: RestGenesis
proposerTasks*: Table[Slot, seq[ProposerTask]]
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
rng*: ref HmacDrbgContext
ValidatorClientRef* = ref ValidatorClient
ValidatorClientError* = object of CatchableError
ValidatorApiError* = object of ValidatorClientError
const
DefaultDutyAndProof* = DutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
DefaultSyncDutyAndProof* =
SyncDutyAndProof(epoch: Epoch(0xFFFF_FFFF_FFFF_FFFF'u64))
SlotDuration* = int64(SECONDS_PER_SLOT).seconds
OneThirdDuration* = int64(SECONDS_PER_SLOT).seconds div INTERVALS_PER_SLOT
AllBeaconNodeRoles* = {
BeaconNodeRole.Duties,
BeaconNodeRole.AttestationData,
BeaconNodeRole.AttestationPublish,
BeaconNodeRole.AggregatedData,
BeaconNodeRole.AggregatedPublish,
BeaconNodeRole.BlockProposalData,
BeaconNodeRole.BlockProposalPublish,
BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish,
}
proc `$`*(roles: set[BeaconNodeRole]): string =
if card(roles) > 0:
if roles != AllBeaconNodeRoles:
var res: seq[string]
if BeaconNodeRole.Duties in roles:
res.add("duties")
if BeaconNodeRole.AttestationData in roles:
res.add("attestation-data")
if BeaconNodeRole.AttestationPublish in roles:
res.add("attestation-publish")
if BeaconNodeRole.AggregatedData in roles:
res.add("aggregated-data")
if BeaconNodeRole.AggregatedPublish in roles:
res.add("aggregated-publish")
if BeaconNodeRole.BlockProposalData in roles:
res.add("block-data")
if BeaconNodeRole.BlockProposalPublish in roles:
res.add("block-publish")
if BeaconNodeRole.SyncCommitteeData in roles:
res.add("sync-data")
if BeaconNodeRole.SyncCommitteePublish in roles:
res.add("sync-publish")
res.join(",")
else:
"{all}"
else:
"{}"
proc shortLog*(roles: set[BeaconNodeRole]): string =
var r = "AGBSD"
if BeaconNodeRole.AttestationData in roles:
if BeaconNodeRole.AttestationPublish in roles: r[0] = 'A' else: r[0] = 'a'
else:
if BeaconNodeRole.AttestationPublish in roles: r[0] = '+' else: r[0] = '-'
if BeaconNodeRole.AggregatedData in roles:
if BeaconNodeRole.AggregatedPublish in roles: r[1] = 'G' else: r[1] = 'g'
else:
if BeaconNodeRole.AggregatedPublish in roles: r[1] = '+' else: r[1] = '-'
if BeaconNodeRole.BlockProposalData in roles:
if BeaconNodeRole.BlockProposalPublish in roles: r[2] = 'B' else: r[2] = 'b'
else:
if BeaconNodeRole.BlockProposalPublish in roles: r[2] = '+' else: r[2] = '-'
if BeaconNodeRole.SyncCommitteeData in roles:
if BeaconNodeRole.SyncCommitteePublish in roles:
r[3] = 'S' else: r[3] = 's'
else:
if BeaconNodeRole.SyncCommitteePublish in roles:
r[3] = '+' else: r[3] = '-'
if BeaconNodeRole.Duties in roles: r[4] = 'D' else: r[4] = '-'
r
proc `$`*(bn: BeaconNodeServerRef): string =
if bn.ident.isSome():
bn.logIdent & "[" & bn.ident.get() & "]"
else:
bn.logIdent
proc validatorLog*(key: ValidatorPubKey,
index: ValidatorIndex): string =
var res = shortLog(key)
res.add('@')
res.add(Base10.toString(uint64(index)))
res
chronicles.expandIt(BeaconNodeServerRef):
node = $it
node_index = it.index
node_roles = shortLog(it.roles)
chronicles.expandIt(RestAttesterDuty):
pubkey = shortLog(it.pubkey)
slot = it.slot
validator_index = it.validator_index
committee_index = it.committee_index
committee_length = it.committee_length
committees_at_slot = it.committees_at_slot
validator_committee_index = it.validator_committee_index
chronicles.expandIt(SyncCommitteeDuty):
pubkey = shortLog(it.pubkey)
validator_index = it.validator_index
validator_sync_committee_index = it.validator_sync_committee_index
proc stop*(csr: ClientServiceRef) {.async.} =
debug "Stopping service", service = csr.name
if csr.state == ServiceState.Running:
csr.state = ServiceState.Closing
if not(csr.lifeFut.finished()):
await csr.lifeFut.cancelAndWait()
csr.state = ServiceState.Closed
debug "Service stopped", service = csr.name
proc isDefault*(dap: DutyAndProof): bool =
dap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc isDefault*(sdap: SyncDutyAndProof): bool =
sdap.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc isDefault*(prd: ProposedData): bool =
prd.epoch == Epoch(0xFFFF_FFFF_FFFF_FFFF'u64)
proc parseRoles*(data: string): Result[set[BeaconNodeRole], cstring] =
var res: set[BeaconNodeRole]
if len(data) == 0:
return ok(AllBeaconNodeRoles)
let parts = data.split("roles=")
if (len(parts) != 2) or (len(parts[0]) != 0):
return err("Invalid beacon node roles string")
let sroles = parts[1].split(",")
for srole in sroles:
case toLower(strip(srole))
of "":
discard
of "all":
res.incl(AllBeaconNodeRoles)
of "attestation":
res.incl({BeaconNodeRole.AttestationData,
BeaconNodeRole.AttestationPublish})
of "block":
res.incl({BeaconNodeRole.BlockProposalData,
BeaconNodeRole.BlockProposalPublish})
of "aggregated":
res.incl({BeaconNodeRole.AggregatedData,
BeaconNodeRole.AggregatedPublish})
of "sync":
res.incl({BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish})
of "attestation-data":
res.incl(BeaconNodeRole.AttestationData)
of "attestation-publish":
res.incl(BeaconNodeRole.AttestationPublish)
of "aggregated-data":
res.incl(BeaconNodeRole.AggregatedData)
of "aggregated-publish":
res.incl(BeaconNodeRole.AggregatedPublish)
of "block-data":
res.incl(BeaconNodeRole.BlockProposalData)
of "block-publish":
res.incl(BeaconNodeRole.BlockProposalPublish)
of "sync-data":
res.incl(BeaconNodeRole.SyncCommitteeData)
of "sync-publish":
res.incl(BeaconNodeRole.SyncCommitteePublish)
of "duties":
res.incl(BeaconNodeRole.Duties)
else:
return err("Invalid beacon node role string found")
ok(res)
proc init*(t: typedesc[BeaconNodeServerRef], remote: Uri,
index: int): Result[BeaconNodeServerRef, string] =
doAssert(index >= 0)
let
flags = {RestClientFlag.CommaSeparatedArray}
client =
block:
let res = RestClientRef.new($remote, flags = flags)
if res.isErr(): return err($res.error())
res.get()
roles =
block:
let res = parseRoles(remote.anchor)
if res.isErr(): return err($res.error())
res.get()
let server = BeaconNodeServerRef(
client: client, endpoint: $remote, index: index, roles: roles,
logIdent: client.address.hostname & ":" &
Base10.toString(client.address.port)
)
ok(server)
proc getMissingRoles*(n: openArray[BeaconNodeServerRef]): set[BeaconNodeRole] =
var res: set[BeaconNodeRole] = AllBeaconNodeRoles
for node in n.items():
res.excl(node.roles)
res
proc init*(t: typedesc[DutyAndProof], epoch: Epoch, dependentRoot: Eth2Digest,
duty: RestAttesterDuty,
slotSig: Option[ValidatorSig]): DutyAndProof =
DutyAndProof(epoch: epoch, dependentRoot: dependentRoot, data: duty,
slotSig: slotSig)
proc init*(t: typedesc[SyncDutyAndProof], epoch: Epoch,
duty: SyncCommitteeDuty,
slotSig: Option[ValidatorSig]): SyncDutyAndProof =
SyncDutyAndProof(epoch: epoch, data: duty, slotSig: slotSig)
proc init*(t: typedesc[ProposedData], epoch: Epoch, dependentRoot: Eth2Digest,
data: openArray[ProposerTask]): ProposedData =
ProposedData(epoch: epoch, dependentRoot: dependentRoot, duties: @data)
proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] =
let
wallTime = vc.beaconClock.now()
wallSlot = wallTime.toSlot()
if not(wallSlot.afterGenesis):
let checkGenesisTime = vc.beaconClock.fromNow(start_beacon_time(Slot(0)))
warn "Jump in time detected, something wrong with wallclock",
wall_time = wallTime, genesisIn = checkGenesisTime.offset
none[Slot]()
else:
some(wallSlot.slot)
proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[DutyAndProof] =
## Returns all `DutyAndProof` for the given `slot`.
var res: seq[DutyAndProof]
let epoch = slot.epoch()
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
if duty.data.slot == slot:
res.add(duty)
res
proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef,
slot: Slot): seq[SyncDutyAndProof] =
## Returns all `SyncDutyAndProof` for the given `slot`.
var res: seq[SyncDutyAndProof]
let epoch = slot.epoch()
for key, item in mpairs(vc.syncCommitteeDuties):
item.duties.withValue(epoch, duty):
res.add(duty[])
res
proc getDurationToNextAttestation*(vc: ValidatorClientRef,
slot: Slot): string =
var minSlot = FAR_FUTURE_SLOT
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
for key, item in vc.attesters:
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
if not(duty.isDefault()):
let dutySlotTime = duty.data.slot
if (duty.data.slot < minSlot) and (duty.data.slot >= slot):
minSlot = duty.data.slot
if minSlot != FAR_FUTURE_SLOT:
break
if minSlot == FAR_FUTURE_SLOT:
"<unknown>"
else:
$(minSlot.attestation_deadline() - slot.start_beacon_time())
proc getDurationToNextBlock*(vc: ValidatorClientRef, slot: Slot): string =
var minSlot = FAR_FUTURE_SLOT
let currentEpoch = slot.epoch()
for epoch in [currentEpoch, currentEpoch + 1'u64]:
let data = vc.proposers.getOrDefault(epoch)
if not(data.isDefault()):
for item in data.duties:
if item.duty.pubkey in vc.attachedValidators[]:
if (item.duty.slot < minSlot) and (item.duty.slot >= slot):
minSlot = item.duty.slot
if minSlot != FAR_FUTURE_SLOT:
break
if minSlot == FAR_FUTURE_SLOT:
"<unknown>"
else:
$(minSlot.block_deadline() - slot.start_beacon_time())
iterator attesterDutiesForEpoch*(vc: ValidatorClientRef,
epoch: Epoch): DutyAndProof =
for key, item in vc.attesters:
let epochDuties = item.duties.getOrDefault(epoch)
if not(isDefault(epochDuties)):
yield epochDuties
proc syncMembersSubscriptionInfoForEpoch*(
vc: ValidatorClientRef,
epoch: Epoch): seq[SyncCommitteeSubscriptionInfo] =
var res: seq[SyncCommitteeSubscriptionInfo]
for key, item in mpairs(vc.syncCommitteeDuties):
var cur: SyncCommitteeSubscriptionInfo
var initialized = false
item.duties.withValue(epoch, epochDuties):
if not initialized:
cur.validator_index = epochDuties.data.validator_index
initialized = true
cur.validator_sync_committee_indices.add(
epochDuties.data.validator_sync_committee_index)
if initialized:
res.add cur
res
proc getDelay*(vc: ValidatorClientRef, deadline: BeaconTime): TimeDiff =
vc.beaconClock.now() - deadline
proc getValidator*(vc: ValidatorClientRef,
key: ValidatorPubKey): Opt[AttachedValidator] =
let validator = vc.attachedValidators[].getValidator(key)
if isNil(validator):
info "Validator not in pool anymore", validator = shortLog(validator)
Opt.none(AttachedValidator)
else:
if validator.index.isNone():
info "Validator index is missing", validator = shortLog(validator)
Opt.none(AttachedValidator)
else:
Opt.some(validator)
proc forkAtEpoch*(vc: ValidatorClientRef, epoch: Epoch): Fork =
# If schedule is present, it MUST not be empty.
doAssert(len(vc.forks) > 0)
var res: Fork
for item in vc.forks:
if item.epoch <= epoch:
res = item
else:
break
res
proc getSubcommitteeIndex*(index: IndexInSyncCommittee): SyncSubcommitteeIndex =
SyncSubcommitteeIndex(uint16(index) div SYNC_SUBCOMMITTEE_SIZE)
proc currentSlot*(vc: ValidatorClientRef): Slot =
vc.beaconClock.now().slotOrZero()
proc addDoppelganger*(vc: ValidatorClientRef,
validators: openArray[AttachedValidator]) =
if vc.config.doppelgangerDetection:
let startEpoch = vc.currentSlot().epoch()
var
check: seq[string]
skip: seq[string]
exist: seq[string]
for validator in validators:
let
vindex = validator.index.get()
state =
if (startEpoch == GENESIS_EPOCH) and
(validator.startSlot == GENESIS_SLOT):
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Passed)
else:
if validator.activationEpoch.isSome() and
(validator.activationEpoch.get() >= startEpoch):
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Passed)
else:
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Checking)
res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state)
if res:
exist.add(validatorLog(validator.pubkey, vindex))
else:
if state.status == DoppelgangerStatus.Checking:
check.add(validatorLog(validator.pubkey, vindex))
else:
skip.add(validatorLog(validator.pubkey, vindex))
info "Validator's doppelganger protection activated",
validators_count = len(validators),
pending_check_count = len(check),
skipped_count = len(skip),
exist_count = len(exist)
debug "Validator's doppelganger protection dump",
checking_validators = check,
skip_validators = skip,
existing_validators = exist
proc addDoppelganger*(vc: ValidatorClientRef, validator: AttachedValidator) =
logScope:
validator = shortLog(validator)
if vc.config.doppelgangerDetection:
let
vindex = validator.index.get()
startEpoch = vc.currentSlot().epoch()
state =
if (startEpoch == GENESIS_EPOCH) and
(validator.startSlot == GENESIS_SLOT):
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Passed)
else:
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Checking)
res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state)
if res:
warn "Validator is already in doppelganger table",
validator_index = vindex, start_epoch = startEpoch,
start_slot = validator.startSlot
else:
if state.status == DoppelgangerStatus.Checking:
info "Doppelganger protection activated", validator_index = vindex,
start_epoch = startEpoch, start_slot = validator.startSlot
else:
info "Doppelganger protection skipped", validator_index = vindex,
start_epoch = startEpoch, start_slot = validator.startSlot
proc removeDoppelganger*(vc: ValidatorClientRef, index: ValidatorIndex) =
if vc.config.doppelgangerDetection:
var state: DoppelgangerState
# We do not care about race condition, when validator is not yet added to
# the doppelganger's table, but it should be removed.
discard vc.doppelgangerDetection.validators.pop(index, state)
proc addValidator*(vc: ValidatorClientRef, keystore: KeystoreData) =
let
slot = vc.currentSlot()
feeRecipient = vc.config.validatorsDir.getSuggestedFeeRecipient(
keystore.pubkey, vc.config.defaultFeeRecipient).valueOr(
vc.config.defaultFeeRecipient)
case keystore.kind
of KeystoreKind.Local:
vc.attachedValidators[].addLocalValidator(keystore, Opt.none ValidatorIndex,
feeRecipient, slot)
of KeystoreKind.Remote:
let
httpFlags =
block:
var res: set[HttpClientFlag]
if RemoteKeystoreFlag.IgnoreSSLVerification in keystore.flags:
res.incl({HttpClientFlag.NoVerifyHost,
HttpClientFlag.NoVerifyServerName})
res
prestoFlags = {RestClientFlag.CommaSeparatedArray}
clients =
block:
var res: seq[(RestClientRef, RemoteSignerInfo)]
for remote in keystore.remotes:
let client = RestClientRef.new($remote.url, prestoFlags,
httpFlags)
if client.isErr():
warn "Unable to resolve distributed signer address",
remote_url = $remote.url, validator = $remote.pubkey
else:
res.add((client.get(), remote))
res
if len(clients) > 0:
vc.attachedValidators[].addRemoteValidator(keystore, clients,
Opt.none ValidatorIndex,
feeRecipient, slot)
else:
warn "Unable to initialize remote validator",
validator = $keystore.pubkey
proc removeValidator*(vc: ValidatorClientRef,
pubkey: ValidatorPubKey) {.async.} =
let validator = vc.attachedValidators[].getValidator(pubkey)
if not(isNil(validator)):
if vc.config.doppelgangerDetection:
if validator.index.isSome():
vc.removeDoppelganger(validator.index.get())
case validator.kind
of ValidatorKind.Local:
discard
of ValidatorKind.Remote:
# We must close all the REST clients running for the remote validator.
let pending =
block:
var res: seq[Future[void]]
for item in validator.clients:
res.add(item[0].closeWait())
res
await allFutures(pending)
# Remove validator from ValidatorPool.
vc.attachedValidators[].removeValidator(pubkey)
proc doppelgangerCheck*(vc: ValidatorClientRef,
validator: AttachedValidator): bool =
if vc.config.doppelgangerDetection:
if validator.index.isNone():
false
else:
let
vindex = validator.index.get()
default = DoppelgangerState(status: DoppelgangerStatus.None)
state = vc.doppelgangerDetection.validators.getOrDefault(vindex,
default)
state.status == DoppelgangerStatus.Passed
else:
true
proc doppelgangerCheck*(vc: ValidatorClientRef,
key: ValidatorPubKey): bool =
let validator = vc.getValidator(key).valueOr: return false
vc.doppelgangerCheck(validator)
proc doppelgangerFilter*(
vc: ValidatorClientRef,
duties: openArray[DutyAndProof]
): tuple[filtered: seq[DutyAndProof], skipped: seq[string]] =
var
pending: seq[string]
ready: seq[DutyAndProof]
for duty in duties:
let
vindex = duty.data.validator_index
vkey = duty.data.pubkey
if vc.doppelgangerCheck(vkey):
ready.add(duty)
else:
pending.add(validatorLog(vkey, vindex))
(ready, pending)
proc getFeeRecipient*(vc: ValidatorClientRef, pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex,
epoch: Epoch): Opt[Eth1Address] =
let dynamicRecipient = vc.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
validatorIdx, epoch)
if dynamicRecipient.isSome():
Opt.some(dynamicRecipient.get())
else:
let staticRecipient = getSuggestedFeeRecipient(
vc.config.validatorsDir, pubkey, vc.config.defaultFeeRecipient)
if staticRecipient.isOk():
Opt.some(staticRecipient.get())
else:
Opt.none(Eth1Address)
proc prepareProposersList*(vc: ValidatorClientRef,
epoch: Epoch): seq[PrepareBeaconProposer] =
var res: seq[PrepareBeaconProposer]
for validator in vc.attachedValidators[].items():
if validator.index.isSome():
let
index = validator.index.get()
feeRecipient = vc.getFeeRecipient(validator.pubkey, index, epoch)
if feeRecipient.isSome():
res.add(PrepareBeaconProposer(validator_index: index,
fee_recipient: feeRecipient.get()))
res
proc isDefault*(reg: SignedValidatorRegistrationV1): bool =
(reg.message.timestamp == 0'u64) or (reg.message.gas_limit == 0'u64)
proc isExpired*(vc: ValidatorClientRef,
reg: SignedValidatorRegistrationV1, slot: Slot): bool =
let
regTime = fromUnix(int64(reg.message.timestamp))
regSlot =
block:
let res = vc.beaconClock.toSlot(regTime)
if not(res.afterGenesis):
# This case should not be happend, but it could in case of time jumps
# (time could be modified by admin or ntpd).
return false
uint64(res.slot)
if regSlot > slot:
# This case should not be happened, but if it happens (time could be
# modified by admin or ntpd).
false
else:
if (slot - regSlot) div SLOTS_PER_EPOCH >=
EPOCHS_BETWEEN_VALIDATOR_REGISTRATION:
false
else:
true
proc getValidatorRegistraion(
vc: ValidatorClientRef,
validator: AttachedValidator,
timestamp: Time,
fork: Fork
): Result[PendingValidatorRegistration, RegistrationKind] =
if validator.index.isNone():
debug "Validator registration missing validator index",
validator = shortLog(validator)
return err(RegistrationKind.MissingIndex)
let
vindex = validator.index.get()
cached = vc.validatorsRegCache.getOrDefault(validator.pubkey)
currentSlot =
block:
let res = vc.beaconClock.toSlot(timestamp)
if not(res.afterGenesis):
return err(RegistrationKind.IncorrectTime)
res.slot
if cached.isDefault() or vc.isExpired(cached, currentSlot):
let feeRecipient = vc.getFeeRecipient(validator.pubkey, vindex,
currentSlot.epoch())
if feeRecipient.isNone():
debug "Could not get fee recipient for registration data",
validator = shortLog(validator)
return err(RegistrationKind.MissingFee)
var registration =
SignedValidatorRegistrationV1(
message: ValidatorRegistrationV1(
fee_recipient:
ExecutionAddress(data: distinctBase(feeRecipient.get())),
gas_limit: VALIDATOR_DEFAULT_GAS_LIMIT,
timestamp: uint64(timestamp.toUnix()),
pubkey: validator.pubkey
)
)
let sigfut = validator.getBuilderSignature(fork, registration.message)
if sigfut.finished():
# This is short-path if we able to create signature locally.
if not(sigfut.done()):
let exc = sigfut.readError()
debug "Got unexpected exception while signing validator registration",
validator = shortLog(validator), error_name = $exc.name,
error_msg = $exc.msg
return err(RegistrationKind.ErrorSignature)
let sigres = sigfut.read()
if sigres.isErr():
debug "Failed to get signature for validator registration",
validator = shortLog(validator), error = sigres.error()
return err(RegistrationKind.NoSignature)
registration.signature = sigres.get()
# Updating cache table with new signed registration data
vc.validatorsRegCache[registration.message.pubkey] = registration
ok(PendingValidatorRegistration(registration: registration, future: nil))
else:
# Remote signature service involved, cache will be updated later.
ok(PendingValidatorRegistration(registration: registration,
future: sigfut))
else:
# Returning cached result.
err(RegistrationKind.Cached)
proc prepareRegistrationList*(
vc: ValidatorClientRef,
timestamp: Time,
fork: Fork
): Future[seq[SignedValidatorRegistrationV1]] {.async.} =
var
messages: seq[SignedValidatorRegistrationV1]
futures: seq[Future[SignatureResult]]
registrations: seq[SignedValidatorRegistrationV1]
total = vc.attachedValidators[].count()
succeed = 0
bad = 0
errors = 0
indexMissing = 0
feeMissing = 0
cached = 0
timed = 0
for validator in vc.attachedValidators[].items():
let res = vc.getValidatorRegistraion(validator, timestamp, fork)
if res.isOk():
let preg = res.get()
if preg.future.isNil():
registrations.add(preg.registration)
else:
messages.add(preg.registration)
futures.add(preg.future)
else:
case res.error()
of RegistrationKind.Cached: inc(cached)
of RegistrationKind.IncorrectTime: inc(timed)
of RegistrationKind.NoSignature: inc(bad)
of RegistrationKind.ErrorSignature: inc(errors)
of RegistrationKind.MissingIndex: inc(indexMissing)
of RegistrationKind.MissingFee: inc(feeMissing)
succeed = len(registrations)
if len(futures) > 0:
await allFutures(futures)
for index, future in futures.pairs():
if future.done():
let sres = future.read()
if sres.isOk():
var reg = messages[index]
reg.signature = sres.get()
registrations.add(reg)
# Updating cache table
vc.validatorsRegCache[reg.message.pubkey] = reg
inc(succeed)
else:
inc(bad)
else:
inc(errors)
debug "Validator registrations prepared", total = total, succeed = succeed,
cached = cached, bad = bad, errors = errors,
index_missing = indexMissing, fee_missing = feeMissing,
incorrect_time = timed
return registrations