VC: some fixes (#4240)

* Skip doppelganger protection for validators which activated just now or in future.

* Fix sync committee duties spam issue.

* Optimize sync committee duties logging statements.

* Fix missing lazyWait.

* Add short path.

* Address #4087.

* Add missing watch for crash.
This commit is contained in:
Eugene Kabanov 2022-10-21 17:53:30 +03:00 committed by GitHub
parent 593b3cee20
commit 367e7052f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 209 additions and 44 deletions

View File

@ -211,6 +211,8 @@ proc new*(T: type ValidatorClientRef,
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
gracefulExit: newAsyncEvent(),
indicesAvailable: newAsyncEvent(),
dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()),
sigintHandleFut: waitSignal(SIGINT),
sigtermHandleFut: waitSignal(SIGTERM)
)
@ -222,7 +224,9 @@ proc new*(T: type ValidatorClientRef,
graffitiBytes: config.graffiti.get(defaultGraffitiBytes()),
nodesAvailable: newAsyncEvent(),
forksAvailable: newAsyncEvent(),
indicesAvailable: newAsyncEvent(),
gracefulExit: newAsyncEvent(),
dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()),
sigintHandleFut: newFuture[void]("sigint_placeholder"),
sigtermHandleFut: newFuture[void]("sigterm_placeholder")
)

View File

@ -45,7 +45,25 @@ proc `$`*(strategy: ApiStrategyKind): string =
of ApiStrategyKind.Priority:
"priority"
proc lazyWait(futures: seq[FutureBase], timerFut: Future[void]) {.async.} =
proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase) {.async.} =
try:
await allFutures(request)
if request.failed():
node.status = RestBeaconNodeStatus.Offline
except CancelledError as exc:
node.status = RestBeaconNodeStatus.Offline
await cancelAndWait(request)
proc lazyWait(nodes: seq[BeaconNodeServerRef], requests: seq[FutureBase],
timerFut: Future[void]) {.async.} =
doAssert(len(nodes) == len(requests))
if len(nodes) == 0:
return
var futures: seq[Future[void]]
for index in 0 ..< len(requests):
futures.add(lazyWaiter(nodes[index], requests[index]))
if not(isNil(timerFut)):
await allFutures(futures) or timerFut
if timerFut.finished():
@ -151,6 +169,8 @@ template firstSuccessParallel*(
status =
try:
body2
except CancelledError as exc:
raise exc
except CatchableError:
raiseAssert("Response handler must not raise exceptions")
@ -158,6 +178,7 @@ template firstSuccessParallel*(
if apiResponse.isOk() and (status == RestBeaconNodeStatus.Online):
retRes = apiResponse
resultReady = true
asyncSpawn lazyWait(pendingNodes, pendingRequests, timerFut)
break
else:
# Timeout exceeded first.
@ -2026,3 +2047,43 @@ proc getValidatorsActivity*(
if activity[index].active:
activities[index].active = true
return GetValidatorsActivityResponse(data: activities)
proc prepareBeaconProposer*(
vc: ValidatorClientRef,
data: seq[PrepareBeaconProposer]
): Future[int] {.async.} =
logScope: request = "prepareBeaconProposer"
let resp = vc.onceToAll(RestPlainResponse, SlotDuration,
{BeaconNodeRole.BlockProposalPublish},
prepareBeaconProposer(it, data))
if len(resp.data) == 0:
# We did not get any response from beacon nodes.
case resp.status
of ApiOperation.Success:
# This should not be happened, there should be present at least one
# successfull response.
return 0
of ApiOperation.Timeout:
debug "Unable to perform beacon proposer preparation request in time",
timeout = SlotDuration
return 0
of ApiOperation.Interrupt:
debug "Beacon proposer's preparation request was interrupted"
return 0
of ApiOperation.Failure:
debug "Unexpected error happened while preparing beacon proposers"
return 0
else:
var count = 0
for apiResponse in resp.data:
if apiResponse.data.isErr():
debug "Unable to perform beacon proposer preparation request",
endpoint = apiResponse.node, error = apiResponse.data.error()
else:
let response = apiResponse.data.get()
if response.status == 200:
inc(count)
else:
debug "Beacon proposer preparation failed", status = response.status,
endpoint = apiResponse.node
return count

View File

@ -13,7 +13,8 @@ import
metrics, metrics/chronos_httpserver,
".."/spec/datatypes/[phase0, altair],
".."/spec/[eth2_merkleization, helpers, signatures, validator],
".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client],
".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client,
dynamic_fee_recipients],
".."/validators/[keystore_management, validator_pool, slashing_protection],
".."/[conf, beacon_clock, version, nimbus_binary_common]
@ -22,7 +23,8 @@ export
nimbus_binary_common, version, conf, options, tables, results, base10,
byteutils, presto_client, eth2_rest_serialization, rest_beacon_client,
phase0, altair, helpers, signatures, validator, eth2_merkleization,
beacon_clock, keystore_management, slashing_protection, validator_pool
beacon_clock, keystore_management, slashing_protection, validator_pool,
dynamic_fee_recipients
const
SYNC_TOLERANCE* = 4'u64
@ -166,12 +168,14 @@ type
forks*: seq[Fork]
forksAvailable*: AsyncEvent
nodesAvailable*: AsyncEvent
indicesAvailable*: AsyncEvent
gracefulExit*: AsyncEvent
attesters*: AttesterMap
proposers*: ProposerMap
syncCommitteeDuties*: SyncCommitteeDutiesMap
beaconGenesis*: RestGenesis
proposerTasks*: Table[Slot, seq[ProposerTask]]
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
rng*: ref HmacDrbgContext
ValidatorClientRef* = ref ValidatorClient
@ -275,6 +279,11 @@ chronicles.expandIt(RestAttesterDuty):
committees_at_slot = it.committees_at_slot
validator_committee_index = it.validator_committee_index
chronicles.expandIt(SyncCommitteeDuty):
pubkey = shortLog(it.pubkey)
validator_index = it.validator_index
validator_sync_committee_index = it.validator_sync_committee_index
proc stop*(csr: ClientServiceRef) {.async.} =
debug "Stopping service", service = csr.name
if csr.state == ServiceState.Running:
@ -420,19 +429,6 @@ proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef,
res.add(duty[])
res
proc removeOldSyncPeriodDuties*(vc: ValidatorClientRef,
slot: Slot) =
if slot.is_sync_committee_period:
let epoch = slot.epoch()
var prunedDuties = SyncCommitteeDutiesMap()
for key, item in vc.syncCommitteeDuties:
var curPeriodDuties = EpochSyncDuties()
for epochKey, epochDuty in item.duties:
if epochKey >= epoch:
curPeriodDuties.duties[epochKey] = epochDuty
prunedDuties[key] = curPeriodDuties
vc.syncCommitteeDuties = prunedDuties
proc getDurationToNextAttestation*(vc: ValidatorClientRef,
slot: Slot): string =
var minSlot = FAR_FUTURE_SLOT
@ -548,11 +544,16 @@ proc addDoppelganger*(vc: ValidatorClientRef,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Passed)
else:
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Checking)
if validator.activationEpoch.isSome() and
(validator.activationEpoch.get() >= startEpoch):
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Passed)
else:
DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64,
lastAttempt: DoppelgangerAttempt.None,
status: DoppelgangerStatus.Checking)
res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state)
if res:
exist.add(validatorLog(validator.pubkey, vindex))
else:
@ -677,15 +678,12 @@ proc doppelgangerCheck*(vc: ValidatorClientRef,
if validator.index.isNone():
false
else:
if validator.startSlot > GENESIS_SLOT:
let
vindex = validator.index.get()
default = DoppelgangerState(status: DoppelgangerStatus.None)
state = vc.doppelgangerDetection.validators.getOrDefault(vindex,
default)
state.status == DoppelgangerStatus.Passed
else:
true
let
vindex = validator.index.get()
default = DoppelgangerState(status: DoppelgangerStatus.None)
state = vc.doppelgangerDetection.validators.getOrDefault(vindex,
default)
state.status == DoppelgangerStatus.Passed
else:
true
@ -698,7 +696,6 @@ proc doppelgangerFilter*(
vc: ValidatorClientRef,
duties: openArray[DutyAndProof]
): tuple[filtered: seq[DutyAndProof], skipped: seq[string]] =
let defstate = DoppelgangerState(status: DoppelgangerStatus.None)
var
pending: seq[string]
ready: seq[DutyAndProof]
@ -711,3 +708,31 @@ proc doppelgangerFilter*(
else:
pending.add(validatorLog(vkey, vindex))
(ready, pending)
proc getFeeRecipient*(vc: ValidatorClientRef, pubkey: ValidatorPubKey,
validatorIdx: ValidatorIndex,
epoch: Epoch): Opt[Eth1Address] =
let dynamicRecipient = vc.dynamicFeeRecipientsStore[].getDynamicFeeRecipient(
validatorIdx, epoch)
if dynamicRecipient.isSome():
Opt.some(dynamicRecipient.get())
else:
let staticRecipient = getSuggestedFeeRecipient(
vc.config.validatorsDir, pubkey, vc.config.defaultFeeRecipient)
if staticRecipient.isOk():
Opt.some(staticRecipient.get())
else:
Opt.none(Eth1Address)
proc prepareProposersList*(vc: ValidatorClientRef,
epoch: Epoch): seq[PrepareBeaconProposer] =
var res: seq[PrepareBeaconProposer]
for validator in vc.attachedValidators[].items():
if validator.index.isSome():
let
index = validator.index.get()
feeRecipient = vc.getFeeRecipient(validator.pubkey, index, epoch)
if feeRecipient.isSome():
res.add(PrepareBeaconProposer(validator_index: index,
fee_recipient: feeRecipient.get()))
res

View File

@ -16,7 +16,8 @@ logScope: service = ServiceName
type
DutiesServiceLoop* = enum
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop
AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop,
ProposerPreparationLoop
chronicles.formatIt(DutiesServiceLoop):
case it
@ -24,6 +25,7 @@ chronicles.formatIt(DutiesServiceLoop):
of ProposerLoop: "proposer_loop"
of IndicesLoop: "index_loop"
of SyncCommitteeLoop: "sync_committee_loop"
of ProposerPreparationLoop: "proposer_prepare_loop"
proc checkDuty(duty: RestAttesterDuty): bool =
(duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and
@ -88,6 +90,7 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
missing.add(validatorLog(item.validator.pubkey, item.index))
else:
validator.index = Opt.some(item.index)
validator.activationEpoch = Opt.some(item.validator.activation_epoch)
updated.add(validatorLog(item.validator.pubkey, item.index))
list.add(validator)
@ -96,6 +99,7 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} =
updated_validators = len(updated)
trace "Validator indices update dump", missing_validators = missing,
updated_validators = updated
vc.indicesAvailable.fire()
vc.addDoppelganger(list)
proc pollForAttesterDuties*(vc: ValidatorClientRef,
@ -229,6 +233,18 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef,
return len(addOrReplaceItems)
proc pruneSyncCommitteeDuties*(vc: ValidatorClientRef, slot: Slot) =
if slot.is_sync_committee_period():
var newSyncCommitteeDuties: SyncCommitteeDutiesMap
let epoch = slot.epoch()
for key, item in vc.syncCommitteeDuties:
var currentPeriodDuties = EpochSyncDuties()
for epochKey, epochDuty in item.duties:
if epochKey >= epoch:
currentPeriodDuties.duties[epochKey] = epochDuty
newSyncCommitteeDuties[key] = currentPeriodDuties
vc.syncCommitteeDuties = newSyncCommitteeDuties
proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
epoch: Epoch): Future[int] {.async.} =
let validatorIndices = toSeq(vc.attachedValidators[].indices())
@ -268,7 +284,8 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
block:
var res: seq[SyncCommitteeDuty]
for duty in filteredDuties:
for validatorSyncCommitteeIndex in duty.validator_sync_committee_indices:
for validatorSyncCommitteeIndex in
duty.validator_sync_committee_indices:
res.add(SyncCommitteeDuty(
pubkey: duty.pubkey,
validator_index: duty.validator_index,
@ -281,12 +298,20 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
let addOrReplaceItems =
block:
var alreadyWarned = false
var res: seq[tuple[epoch: Epoch, duty: SyncCommitteeDuty]]
for duty in relevantDuties:
let map = vc.syncCommitteeDuties.getOrDefault(duty.pubkey)
let epochDuty = map.duties.getOrDefault(epoch, DefaultSyncDutyAndProof)
info "Received new sync committee duty", duty, epoch
res.add((epoch, duty))
if epochDuty.isDefault():
info "Received new sync committee duty", duty, epoch
res.add((epoch, duty))
else:
if epochDuty.data != duty:
if not(alreadyWarned):
info "Sync committee duties re-organization", duty, epoch
alreadyWarned = true
res.add((epoch, duty))
res
if len(addOrReplaceItems) > 0:
@ -332,7 +357,8 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef,
SyncDutyAndProof.init(item.epoch, item.duty,
none[ValidatorSig]())
var validatorDuties = vc.syncCommitteeDuties.getOrDefault(item.duty.pubkey)
var validatorDuties =
vc.syncCommitteeDuties.getOrDefault(item.duty.pubkey)
validatorDuties.duties[item.epoch] = dap
vc.syncCommitteeDuties[item.duty.pubkey] = validatorDuties
@ -448,6 +474,8 @@ proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} =
if not(res):
error "Failed to subscribe validators"
vc.pruneSyncCommitteeDuties(currentSlot)
proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) =
var proposers: ProposerMap
for epochKey, data in vc.proposers:
@ -492,6 +520,37 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} =
vc.pruneBeaconProposers(currentEpoch)
proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} =
let vc = service.client
let sres = vc.getCurrentSlot()
if sres.isSome():
let
currentSlot = sres.get()
currentEpoch = currentSlot.epoch()
proposers = vc.prepareProposersList(currentEpoch)
if len(proposers) > 0:
let count =
try:
await prepareBeaconProposer(vc, proposers)
except ValidatorApiError as exc:
warn "Unable to prepare beacon proposers", slot = currentSlot,
epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
0
except CancelledError as exc:
debug "Beacon proposer preparation processing was interrupted"
raise exc
except CatchableError as exc:
error "Unexpected error occured while preparing beacon proposers",
slot = currentSlot, epoch = currentEpoch, err_name = exc.name,
err_msg = exc.msg
0
debug "Beacon proposers prepared",
validators_count = vc.attachedValidators[].count(),
proposers_count = len(proposers),
prepared_count = count
proc waitForNextSlot(service: DutiesServiceRef,
serviceLoop: DutiesServiceLoop) {.async.} =
let vc = service.client
@ -524,7 +583,15 @@ proc validatorIndexLoop(service: DutiesServiceRef) {.async.} =
await vc.pollForValidatorIndices()
await service.waitForNextSlot(IndicesLoop)
proc syncCommitteeeDutiesLoop(service: DutiesServiceRef) {.async.} =
proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Beacon proposer preparation loop waiting for validator indices update"
await vc.indicesAvailable.wait()
while true:
await service.prepareBeaconProposers()
await service.waitForNextSlot(ProposerPreparationLoop)
proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} =
let vc = service.client
debug "Sync committee duties loop waiting for fork schedule update"
@ -556,7 +623,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
attestFut = service.attesterDutiesLoop()
proposeFut = service.proposerDutiesLoop()
indicesFut = service.validatorIndexLoop()
syncFut = service.syncCommitteeeDutiesLoop()
syncFut = service.syncCommitteeDutiesLoop()
prepareFut = service.proposerPreparationsLoop()
while true:
# This loop could look much more nicer/better, when
@ -564,12 +632,15 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
# become safe to combine loops, breaks and exception handlers.
let breakLoop =
try:
discard await race(attestFut, proposeFut, indicesFut, syncFut)
discard await race(attestFut, proposeFut, indicesFut, syncFut,
prepareFut)
checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop())
checkAndRestart(ProposerLoop, proposeFut, service.proposerDutiesLoop())
checkAndRestart(IndicesLoop, indicesFut, service.validatorIndexLoop())
checkAndRestart(SyncCommitteeLoop,
syncFut, service.syncCommitteeeDutiesLoop())
checkAndRestart(SyncCommitteeLoop, syncFut,
service.syncCommitteeDutiesLoop())
checkAndRestart(ProposerPreparationLoop, prepareFut,
service.proposerPreparationsLoop())
false
except CancelledError:
debug "Service interrupted"
@ -582,6 +653,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} =
pending.add(indicesFut.cancelAndWait())
if not(syncFut.finished()):
pending.add(syncFut.cancelAndWait())
if not(prepareFut.finished()):
pending.add(prepareFut.cancelAndWait())
await allFutures(pending)
true
except CatchableError as exc:

View File

@ -360,10 +360,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) =
let vc = service.client
removeOldSyncPeriodDuties(vc, slot)
let duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
let
vc = service.client
duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
asyncSpawn service.publishSyncMessagesAndContributions(slot, duties)

View File

@ -55,6 +55,9 @@ type
# assumed that a valid index is stored here!
index*: Opt[ValidatorIndex]
# Epoch when validator activated.
activationEpoch*: Opt[Epoch]
# Cache the latest slot signature - the slot signature is used to determine
# if the validator will be aggregating (in the near future)
slotSignature*: Opt[tuple[slot: Slot, signature: ValidatorSig]]