fix slow checking of unknown validators (#4590)

We do a linear scan of all pubkeys for each validator and slot - this
becomes expensive with large validator counts.

* normalise BN/VC validator startup logging
* fix crash when host cannot be resolved while adding remote validator
* silence repeated log spam for unknown validators
* print pubkey/index/activation mapping on startup/validator
identification
This commit is contained in:
Jacek Sieka 2023-02-07 15:53:36 +01:00 committed by GitHub
parent 98db005c70
commit 856fcea8d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 144 additions and 150 deletions

View File

@ -95,6 +95,8 @@ type
externalBuilderRegistrations*:
Table[ValidatorPubKey, SignedValidatorRegistrationV1]
mergeAtEpoch*: Epoch
dutyValidatorCount*: int
## Number of validators that we've checked for activation
const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT

View File

@ -103,7 +103,7 @@ proc initValidators(sn: var SigningNode): bool =
let feeRecipient = default(Eth1Address)
case keystore.kind
of KeystoreKind.Local:
discard sn.attachedValidators.addLocalValidator(keystore, feeRecipient)
discard sn.attachedValidators.addValidator(keystore, feeRecipient)
publicKeyIdents.add("\"0x" & keystore.pubkey.toHex() & "\"")
of KeystoreKind.Remote:
error "Signing node do not support remote validators",

View File

@ -84,15 +84,9 @@ proc initGenesis(vc: ValidatorClientRef): Future[RestGenesis] {.async.} =
return melem
proc initValidators(vc: ValidatorClientRef): Future[bool] {.async.} =
info "Initializaing validators", path = vc.config.validatorsDir()
info "Loading validators", validatorsDir = vc.config.validatorsDir()
var duplicates: seq[ValidatorPubKey]
for keystore in listLoadableKeystores(vc.config):
let pubkey = keystore.pubkey
if pubkey in duplicates:
warn "Duplicate validator key found", validator_pubkey = pubkey
continue
else:
duplicates.add(pubkey)
vc.addValidator(keystore)
return true
@ -277,9 +271,6 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =
vc.syncCommitteeService = await SyncCommitteeServiceRef.init(vc)
vc.keymanagerServer = keymanagerInitResult.server
if vc.keymanagerServer != nil:
func getValidatorData(pubkey: ValidatorPubKey): Opt[ValidatorAndIndex] =
Opt.none(ValidatorAndIndex)
vc.keymanagerHost = newClone KeymanagerHost.init(
validatorPool,
vc.rng,
@ -287,7 +278,7 @@ proc asyncInit(vc: ValidatorClientRef): Future[ValidatorClientRef] {.async.} =
vc.config.validatorsDir,
vc.config.secretsDir,
vc.config.defaultFeeRecipient,
getValidatorData,
nil,
vc.beaconClock.getBeaconTimeFn)
except CatchableError as exc:

View File

@ -126,13 +126,7 @@ proc handleAddRemoteValidatorReq(host: KeymanagerHost,
keystore: RemoteKeystore): RequestItemStatus =
let res = importKeystore(host.validatorPool[], host.validatorsDir, keystore)
if res.isOk:
let
data = host.getValidatorData(keystore.pubkey)
feeRecipient = host.getSuggestedFeeRecipient(keystore.pubkey).valueOr(
host.defaultFeeRecipient)
v = host.validatorPool[].addRemoteValidator(res.get, feeRecipient)
if data.isSome():
v.updateValidator(data.get().index, data.get().validator.activation_epoch)
host.addValidator(res.get())
RequestItemStatus(status: $KeystoreStatus.imported)
else:
@ -199,7 +193,7 @@ proc installKeymanagerHandlers*(router: var RestRouter, host: KeymanagerHost) =
response.data.add(
RequestItemStatus(status: $KeystoreStatus.duplicate))
else:
host.addLocalValidator(res.get())
host.addValidator(res.get())
response.data.add(
RequestItemStatus(status: $KeystoreStatus.imported))

View File

@ -508,37 +508,8 @@ proc addValidator*(vc: ValidatorClientRef, keystore: KeystoreData) =
feeRecipient = vc.config.validatorsDir.getSuggestedFeeRecipient(
keystore.pubkey, vc.config.defaultFeeRecipient).valueOr(
vc.config.defaultFeeRecipient)
case keystore.kind
of KeystoreKind.Local:
discard vc.attachedValidators[].addLocalValidator(keystore, feeRecipient)
of KeystoreKind.Remote:
let
httpFlags =
block:
var res: set[HttpClientFlag]
if RemoteKeystoreFlag.IgnoreSSLVerification in keystore.flags:
res.incl({HttpClientFlag.NoVerifyHost,
HttpClientFlag.NoVerifyServerName})
res
prestoFlags = {RestClientFlag.CommaSeparatedArray}
clients =
block:
var res: seq[(RestClientRef, RemoteSignerInfo)]
for remote in keystore.remotes:
let client = RestClientRef.new($remote.url, prestoFlags,
httpFlags)
if client.isErr():
warn "Unable to resolve distributed signer address",
remote_url = $remote.url, validator = $remote.pubkey
else:
res.add((client.get(), remote))
res
if len(clients) > 0:
discard vc.attachedValidators[].addRemoteValidator(keystore, clients,
feeRecipient)
else:
warn "Unable to initialize remote validator",
validator = $keystore.pubkey
discard vc.attachedValidators[].addValidator(keystore, feeRecipient)
proc removeValidator*(vc: ValidatorClientRef,
pubkey: ValidatorPubKey) {.async.} =

View File

@ -89,15 +89,20 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
for item in validators:
var validator = vc.attachedValidators[].getValidator(item.validator.pubkey)
if isNil(validator):
validator.updateValidator(Opt.none ValidatorAndIndex)
missing.add(validatorLog(item.validator.pubkey, item.index))
else:
validator.updateValidator(item.index, item.validator.activation_epoch)
validator.updateValidator(Opt.some ValidatorAndIndex(
index: item.index,
validator: item.validator))
updated.add(validatorLog(item.validator.pubkey, item.index))
list.add(validator)
if len(updated) > 0:
info "Validator indices updated", missing_validators = len(missing),
updated_validators = len(updated)
info "Validator indices updated",
pending = len(validatorIdents) - len(updated),
missing = len(missing),
updated = len(updated)
trace "Validator indices update dump", missing_validators = missing,
updated_validators = updated
vc.indicesAvailable.fire()

View File

@ -64,10 +64,6 @@ type
ImportResult*[T] = Result[T, AddValidatorFailure]
ValidatorAndIndex* = object
index*: ValidatorIndex
validator*: Validator
ValidatorPubKeyToDataFn* =
proc (pubkey: ValidatorPubKey): Opt[ValidatorAndIndex]
{.raises: [Defect], gcsafe.}
@ -109,22 +105,6 @@ func init*(T: type KeymanagerHost,
getValidatorAndIdxFn: getValidatorAndIdxFn,
getBeaconTimeFn: getBeaconTimeFn)
proc getValidatorIdx*(host: KeymanagerHost,
pubkey: ValidatorPubKey): Opt[ValidatorIndex] =
if not(isNil(host.getValidatorAndIdxFn)):
let res = host.getValidatorAndIdxFn(pubkey).valueOr:
return Opt.none ValidatorIndex
Opt.some res.index
else:
Opt.none ValidatorIndex
proc getValidatorData*(host: KeymanagerHost,
pubkey: ValidatorPubKey): Opt[ValidatorAndIndex] =
if not(isNil(host.getValidatorAndIdxFn)):
host.getValidatorAndIdxFn(pubkey)
else:
Opt.none ValidatorAndIndex
proc echoP*(msg: string) =
## Prints a paragraph aligned to 80 columns
echo ""
@ -1324,15 +1304,15 @@ proc getSuggestedFeeRecipient*(
pubkey: ValidatorPubKey): Result[Eth1Address, FeeRecipientStatus] =
host.validatorsDir.getSuggestedFeeRecipient(pubkey, host.defaultFeeRecipient)
proc addLocalValidator*(host: KeymanagerHost, keystore: KeystoreData) =
proc addValidator*(host: KeymanagerHost, keystore: KeystoreData) =
let
data = host.getValidatorData(keystore.pubkey)
feeRecipient = host.getSuggestedFeeRecipient(keystore.pubkey).valueOr(
host.defaultFeeRecipient)
v = host.validatorPool[].addValidator(keystore, feeRecipient)
let v = host.validatorPool[].addLocalValidator(keystore, feeRecipient)
if data.isSome():
v.updateValidator(data.get().index, data.get().validator.activation_epoch)
if not isNil(host.getValidatorAndIdxFn):
let data = host.getValidatorAndIdxFn(keystore.pubkey)
v.updateValidator(data)
proc generateDeposits*(cfg: RuntimeConfig,
rng: var HmacDrbgContext,

View File

@ -99,14 +99,13 @@ proc getValidator*(validators: auto,
if idx == -1:
# We allow adding a validator even if its key is not in the state registry:
# it might be that the deposit for this validator has not yet been processed
notice "Validator deposit not yet processed, monitoring", pubkey
Opt.none ValidatorAndIndex
else:
Opt.some ValidatorAndIndex(index: ValidatorIndex(idx),
validator: validators[idx])
proc addValidators*(node: BeaconNode) =
debug "Loading validators", validatorsDir = node.config.validatorsDir()
info "Loading validators", validatorsDir = node.config.validatorsDir()
let
epoch = node.currentSlot().epoch
for keystore in listLoadableKeystores(node.config):
@ -121,18 +120,8 @@ proc addValidators*(node: BeaconNode) =
feeRecipient = node.consensusManager[].getFeeRecipient(
keystore.pubkey, index, epoch)
let v = case keystore.kind
of KeystoreKind.Local:
node.attachedValidators[].addLocalValidator(keystore, feeRecipient)
of KeystoreKind.Remote:
node.attachedValidators[].addRemoteValidator(keystore, feeRecipient)
if data.isSome:
v.updateValidator(data.get().index, data.get().validator.activation_epoch)
proc getAttachedValidator(node: BeaconNode,
pubkey: ValidatorPubKey): AttachedValidator =
node.attachedValidators[].getValidator(pubkey)
v = node.attachedValidators[].addValidator(keystore, feeRecipient)
v.updateValidator(data)
proc getValidatorForDuties*(
node: BeaconNode,
@ -1460,6 +1449,29 @@ proc registerValidators*(node: BeaconNode, epoch: Epoch) {.async.} =
warn "registerValidators: exception",
error = exc.msg
proc updateValidators(
node: BeaconNode, validators: openArray[Validator]) =
# Since validator indicies are stable, we only check the "updated" range -
# checking all validators would significantly slow down this loop when there
# are many inactive keys
for i in node.dutyValidatorCount..validators.high:
let v = node.attachedValidators[].getValidator(validators[i].pubkey)
if v != nil:
v.index = Opt.some ValidatorIndex(i)
node.dutyValidatorCount = validators.len
for validator in node.attachedValidators[]:
# Check if any validators have been activated
if validator.needsUpdate and validator.index.isSome():
# Activation epoch can change after index is assigned..
let index = validator.index.get()
if index < validators.lenu64:
validator.updateValidator(
Opt.some(ValidatorAndIndex(
index: index, validator: validators[int index]
)))
proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
## Perform validator duties - create blocks, vote and aggregate existing votes
if node.attachedValidators[].count == 0:
@ -1490,18 +1502,8 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
of SyncStatus.synced:
discard # keep going
for validator in node.attachedValidators[]:
# Check if any validators have been activated
# TODO we could do this the other way around and update only validators
# based on a "last-updated" index since only the tail end of the
# validator list in the state ever changes
if validator.needsUpdate:
let data = withState(node.dag.headState):
getValidator(forkyState.data.validators.asSeq(), validator.pubkey)
if data.isSome():
# Activation epoch can change after index is assigned..
validator.updateValidator(
data.get().index, data.get().validator.activation_epoch)
withState(node.dag.headState):
node.updateValidators(forkyState.data.validators.asSeq())
var curSlot = lastSlot + 1

View File

@ -40,6 +40,10 @@ type
ValidatorConnection* = RestClientRef
ValidatorAndIndex* = object
index*: ValidatorIndex
validator*: Validator
DoppelgangerStatus {.pure.} = enum
Unknown, Checking, Checked
@ -52,6 +56,7 @@ type
clients*: seq[(RestClientRef, RemoteSignerInfo)]
threshold*: uint32
updated*: bool
index*: Opt[ValidatorIndex]
## Validator index which is assigned after the eth1 deposit has been
## processed - this index is valid across all eth2 forks for fork depths
@ -111,7 +116,7 @@ func init*(T: type ValidatorPool,
template count*(pool: ValidatorPool): int =
len(pool.validators)
proc addLocalValidator*(
proc addLocalValidator(
pool: var ValidatorPool, keystore: KeystoreData,
feeRecipient: Eth1Address): AttachedValidator =
doAssert keystore.kind == KeystoreKind.Local
@ -132,7 +137,7 @@ proc addLocalValidator*(
v
proc addRemoteValidator*(pool: var ValidatorPool, keystore: KeystoreData,
proc addRemoteValidator(pool: var ValidatorPool, keystore: KeystoreData,
clients: seq[(RestClientRef, RemoteSignerInfo)],
feeRecipient: Eth1Address): AttachedValidator =
doAssert keystore.kind == KeystoreKind.Remote
@ -154,6 +159,42 @@ proc addRemoteValidator*(pool: var ValidatorPool, keystore: KeystoreData,
v
proc addRemoteValidator(pool: var ValidatorPool,
keystore: KeystoreData,
feeRecipient: Eth1Address): AttachedValidator =
let
httpFlags =
if RemoteKeystoreFlag.IgnoreSSLVerification in keystore.flags:
{HttpClientFlag.NoVerifyHost, HttpClientFlag.NoVerifyServerName}
else:
{}
prestoFlags = {RestClientFlag.CommaSeparatedArray}
clients =
block:
var res: seq[(RestClientRef, RemoteSignerInfo)]
for remote in keystore.remotes:
let client = RestClientRef.new($remote.url, prestoFlags, httpFlags)
if client.isErr():
# TODO keep trying in case of temporary network failure
warn "Unable to resolve distributed signer address",
remote_url = $remote.url, validator = $remote.pubkey
else:
res.add((client.get(), remote))
res
pool.addRemoteValidator(keystore, clients, feeRecipient)
proc addValidator*(pool: var ValidatorPool,
keystore: KeystoreData,
feeRecipient: Eth1Address): AttachedValidator =
pool.validators.withValue(keystore.pubkey, v):
notice "Adding already-known validator", validator = shortLog(v[])
return v[]
case keystore.kind
of KeystoreKind.Local: pool.addLocalValidator(keystore, feeRecipient)
of KeystoreKind.Remote: pool.addRemoteValidator(keystore, feeRecipient)
proc getValidator*(pool: ValidatorPool,
validatorKey: ValidatorPubKey): AttachedValidator =
pool.validators.getOrDefault(validatorKey)
@ -178,10 +219,31 @@ proc removeValidator*(pool: var ValidatorPool, pubkey: ValidatorPubKey) =
proc needsUpdate*(validator: AttachedValidator): bool =
validator.index.isNone() or validator.activationEpoch == FAR_FUTURE_EPOCH
proc updateValidator*(validator: AttachedValidator,
index: ValidatorIndex, activationEpoch: Epoch) =
proc updateValidator*(
validator: AttachedValidator, validatorData: Opt[ValidatorAndIndex]) =
defer: validator.updated = true
let
data = validatorData.valueOr:
if not validator.updated:
notice "Validator deposit not yet processed, monitoring",
pubkey = validator.pubkey
return
index = data.index
activationEpoch = data.validator.activation_epoch
## Update activation information for a validator
validator.index = Opt.some(index)
if validator.index != Opt.some data.index:
validator.index = Opt.some data.index
if validator.activationEpoch != data.validator.activation_epoch:
# In theory, activation epoch could change but that's rare enough that it
# shouldn't practically matter for the current uses
info "Validator activation updated",
validator = shortLog(validator), pubkey = validator.pubkey, index,
activationEpoch
validator.activationEpoch = activationEpoch
if validator.doppelStatus == DoppelgangerStatus.Unknown:
@ -200,26 +262,6 @@ proc close*(pool: var ValidatorPool) =
notice "Could not unlock validator's keystore file",
pubkey = validator.pubkey, validator = shortLog(validator)
proc addRemoteValidator*(pool: var ValidatorPool,
keystore: KeystoreData,
feeRecipient: Eth1Address): AttachedValidator =
var clients: seq[(RestClientRef, RemoteSignerInfo)]
let httpFlags =
block:
var res: set[HttpClientFlag]
if RemoteKeystoreFlag.IgnoreSSLVerification in keystore.flags:
res.incl({HttpClientFlag.NoVerifyHost,
HttpClientFlag.NoVerifyServerName})
res
let prestoFlags = {RestClientFlag.CommaSeparatedArray}
for remote in keystore.remotes:
let client = RestClientRef.new($remote.url, prestoFlags, httpFlags)
if client.isErr():
warn "Unable to resolve distributed signer address",
remote_url = $remote.url, validator = $remote.pubkey
clients.add((client.get(), remote))
pool.addRemoteValidator(keystore, clients, feeRecipient)
iterator publicKeys*(pool: ValidatorPool): ValidatorPubKey =
for item in pool.validators.keys():
yield item

View File

@ -11,6 +11,13 @@ import
unittest2,
../beacon_chain/validators/validator_pool
func makeValidatorAndIndex(
index: ValidatorIndex, activation_epoch: Epoch): Opt[ValidatorAndIndex] =
Opt.some ValidatorAndIndex(
index: index,
validator: Validator(activation_epoch: activation_epoch)
)
suite "Validator pool":
test "Doppelganger for genesis validator":
let
@ -19,7 +26,7 @@ suite "Validator pool":
check:
not v.triggersDoppelganger(GENESIS_EPOCH)
v.updateValidator(ValidatorIndex(1), GENESIS_EPOCH)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(1), GENESIS_EPOCH))
check:
not v.triggersDoppelganger(GENESIS_EPOCH)
@ -33,13 +40,13 @@ suite "Validator pool":
not v.triggersDoppelganger(GENESIS_EPOCH)
not v.triggersDoppelganger(now.epoch())
v.updateValidator(ValidatorIndex(5), FAR_FUTURE_EPOCH)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), FAR_FUTURE_EPOCH))
check: # We still don't know when validator activates so we wouldn't trigger
not v.triggersDoppelganger(GENESIS_EPOCH)
not v.triggersDoppelganger(now.epoch())
v.updateValidator(ValidatorIndex(5), now.epoch())
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch()))
check:
# Activates in current epoch, shouldn't trigger
@ -50,7 +57,7 @@ suite "Validator pool":
v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH)
now = Epoch(10).start_slot()
v.updateValidator(ValidatorIndex(5), now.epoch() - 1)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch() - 1))
check:
# Already activated, should trigger
@ -61,7 +68,7 @@ suite "Validator pool":
v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH)
now = Epoch(10).start_slot()
v.updateValidator(ValidatorIndex(5), now.epoch() + 1)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch() + 1))
check:
# Activates in the future, should not be checked
@ -72,7 +79,7 @@ suite "Validator pool":
v = AttachedValidator(activationEpoch: FAR_FUTURE_EPOCH)
now = Epoch(10).start_slot()
v.updateValidator(ValidatorIndex(5), now.epoch() - 4)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch() - 4))
check:
v.triggersDoppelganger(now.epoch)
@ -92,7 +99,7 @@ suite "Validator pool":
check:
not v.triggersDoppelganger(now.epoch)
v.updateValidator(ValidatorIndex(5), now.epoch())
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch()))
check: # already proven not to validate
not v.triggersDoppelganger(now.epoch)
@ -103,7 +110,7 @@ suite "Validator pool":
now = Epoch(10).start_slot()
v.updateDoppelganger(now.epoch())
v.updateValidator(ValidatorIndex(5), now.epoch() + 1)
v.updateValidator(makeValidatorAndIndex(ValidatorIndex(5), now.epoch() + 1))
check: # doesn't trigger check just after activation
not v.triggersDoppelganger(now.epoch() + 1)