diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index 405214506..e96b177ec 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -211,6 +211,8 @@ proc new*(T: type ValidatorClientRef, nodesAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(), gracefulExit: newAsyncEvent(), + indicesAvailable: newAsyncEvent(), + dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()), sigintHandleFut: waitSignal(SIGINT), sigtermHandleFut: waitSignal(SIGTERM) ) @@ -222,7 +224,9 @@ proc new*(T: type ValidatorClientRef, graffitiBytes: config.graffiti.get(defaultGraffitiBytes()), nodesAvailable: newAsyncEvent(), forksAvailable: newAsyncEvent(), + indicesAvailable: newAsyncEvent(), gracefulExit: newAsyncEvent(), + dynamicFeeRecipientsStore: newClone(DynamicFeeRecipientsStore.init()), sigintHandleFut: newFuture[void]("sigint_placeholder"), sigtermHandleFut: newFuture[void]("sigterm_placeholder") ) diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 1203deb39..8d7bb3baa 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -45,7 +45,25 @@ proc `$`*(strategy: ApiStrategyKind): string = of ApiStrategyKind.Priority: "priority" -proc lazyWait(futures: seq[FutureBase], timerFut: Future[void]) {.async.} = +proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase) {.async.} = + try: + await allFutures(request) + if request.failed(): + node.status = RestBeaconNodeStatus.Offline + except CancelledError as exc: + node.status = RestBeaconNodeStatus.Offline + await cancelAndWait(request) + +proc lazyWait(nodes: seq[BeaconNodeServerRef], requests: seq[FutureBase], + timerFut: Future[void]) {.async.} = + doAssert(len(nodes) == len(requests)) + if len(nodes) == 0: + return + + var futures: seq[Future[void]] + for index in 0 ..< len(requests): + futures.add(lazyWaiter(nodes[index], requests[index])) + if not(isNil(timerFut)): await allFutures(futures) or timerFut if timerFut.finished(): @@ -151,6 +169,8 @@ template firstSuccessParallel*( status = try: body2 + except CancelledError as exc: + raise exc except CatchableError: raiseAssert("Response handler must not raise exceptions") @@ -158,6 +178,7 @@ template firstSuccessParallel*( if apiResponse.isOk() and (status == RestBeaconNodeStatus.Online): retRes = apiResponse resultReady = true + asyncSpawn lazyWait(pendingNodes, pendingRequests, timerFut) break else: # Timeout exceeded first. @@ -2026,3 +2047,43 @@ proc getValidatorsActivity*( if activity[index].active: activities[index].active = true return GetValidatorsActivityResponse(data: activities) + +proc prepareBeaconProposer*( + vc: ValidatorClientRef, + data: seq[PrepareBeaconProposer] + ): Future[int] {.async.} = + logScope: request = "prepareBeaconProposer" + let resp = vc.onceToAll(RestPlainResponse, SlotDuration, + {BeaconNodeRole.BlockProposalPublish}, + prepareBeaconProposer(it, data)) + if len(resp.data) == 0: + # We did not get any response from beacon nodes. + case resp.status + of ApiOperation.Success: + # This should not be happened, there should be present at least one + # successfull response. + return 0 + of ApiOperation.Timeout: + debug "Unable to perform beacon proposer preparation request in time", + timeout = SlotDuration + return 0 + of ApiOperation.Interrupt: + debug "Beacon proposer's preparation request was interrupted" + return 0 + of ApiOperation.Failure: + debug "Unexpected error happened while preparing beacon proposers" + return 0 + else: + var count = 0 + for apiResponse in resp.data: + if apiResponse.data.isErr(): + debug "Unable to perform beacon proposer preparation request", + endpoint = apiResponse.node, error = apiResponse.data.error() + else: + let response = apiResponse.data.get() + if response.status == 200: + inc(count) + else: + debug "Beacon proposer preparation failed", status = response.status, + endpoint = apiResponse.node + return count diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 29fbf2ecf..7dbb25c97 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -13,7 +13,8 @@ import metrics, metrics/chronos_httpserver, ".."/spec/datatypes/[phase0, altair], ".."/spec/[eth2_merkleization, helpers, signatures, validator], - ".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client], + ".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client, + dynamic_fee_recipients], ".."/validators/[keystore_management, validator_pool, slashing_protection], ".."/[conf, beacon_clock, version, nimbus_binary_common] @@ -22,7 +23,8 @@ export nimbus_binary_common, version, conf, options, tables, results, base10, byteutils, presto_client, eth2_rest_serialization, rest_beacon_client, phase0, altair, helpers, signatures, validator, eth2_merkleization, - beacon_clock, keystore_management, slashing_protection, validator_pool + beacon_clock, keystore_management, slashing_protection, validator_pool, + dynamic_fee_recipients const SYNC_TOLERANCE* = 4'u64 @@ -166,12 +168,14 @@ type forks*: seq[Fork] forksAvailable*: AsyncEvent nodesAvailable*: AsyncEvent + indicesAvailable*: AsyncEvent gracefulExit*: AsyncEvent attesters*: AttesterMap proposers*: ProposerMap syncCommitteeDuties*: SyncCommitteeDutiesMap beaconGenesis*: RestGenesis proposerTasks*: Table[Slot, seq[ProposerTask]] + dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore rng*: ref HmacDrbgContext ValidatorClientRef* = ref ValidatorClient @@ -275,6 +279,11 @@ chronicles.expandIt(RestAttesterDuty): committees_at_slot = it.committees_at_slot validator_committee_index = it.validator_committee_index +chronicles.expandIt(SyncCommitteeDuty): + pubkey = shortLog(it.pubkey) + validator_index = it.validator_index + validator_sync_committee_index = it.validator_sync_committee_index + proc stop*(csr: ClientServiceRef) {.async.} = debug "Stopping service", service = csr.name if csr.state == ServiceState.Running: @@ -420,19 +429,6 @@ proc getSyncCommitteeDutiesForSlot*(vc: ValidatorClientRef, res.add(duty[]) res -proc removeOldSyncPeriodDuties*(vc: ValidatorClientRef, - slot: Slot) = - if slot.is_sync_committee_period: - let epoch = slot.epoch() - var prunedDuties = SyncCommitteeDutiesMap() - for key, item in vc.syncCommitteeDuties: - var curPeriodDuties = EpochSyncDuties() - for epochKey, epochDuty in item.duties: - if epochKey >= epoch: - curPeriodDuties.duties[epochKey] = epochDuty - prunedDuties[key] = curPeriodDuties - vc.syncCommitteeDuties = prunedDuties - proc getDurationToNextAttestation*(vc: ValidatorClientRef, slot: Slot): string = var minSlot = FAR_FUTURE_SLOT @@ -548,11 +544,16 @@ proc addDoppelganger*(vc: ValidatorClientRef, lastAttempt: DoppelgangerAttempt.None, status: DoppelgangerStatus.Passed) else: - DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64, - lastAttempt: DoppelgangerAttempt.None, - status: DoppelgangerStatus.Checking) + if validator.activationEpoch.isSome() and + (validator.activationEpoch.get() >= startEpoch): + DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64, + lastAttempt: DoppelgangerAttempt.None, + status: DoppelgangerStatus.Passed) + else: + DoppelgangerState(startEpoch: startEpoch, epochsCount: 0'u64, + lastAttempt: DoppelgangerAttempt.None, + status: DoppelgangerStatus.Checking) res = vc.doppelgangerDetection.validators.hasKeyOrPut(vindex, state) - if res: exist.add(validatorLog(validator.pubkey, vindex)) else: @@ -677,15 +678,12 @@ proc doppelgangerCheck*(vc: ValidatorClientRef, if validator.index.isNone(): false else: - if validator.startSlot > GENESIS_SLOT: - let - vindex = validator.index.get() - default = DoppelgangerState(status: DoppelgangerStatus.None) - state = vc.doppelgangerDetection.validators.getOrDefault(vindex, - default) - state.status == DoppelgangerStatus.Passed - else: - true + let + vindex = validator.index.get() + default = DoppelgangerState(status: DoppelgangerStatus.None) + state = vc.doppelgangerDetection.validators.getOrDefault(vindex, + default) + state.status == DoppelgangerStatus.Passed else: true @@ -698,7 +696,6 @@ proc doppelgangerFilter*( vc: ValidatorClientRef, duties: openArray[DutyAndProof] ): tuple[filtered: seq[DutyAndProof], skipped: seq[string]] = - let defstate = DoppelgangerState(status: DoppelgangerStatus.None) var pending: seq[string] ready: seq[DutyAndProof] @@ -711,3 +708,31 @@ proc doppelgangerFilter*( else: pending.add(validatorLog(vkey, vindex)) (ready, pending) + +proc getFeeRecipient*(vc: ValidatorClientRef, pubkey: ValidatorPubKey, + validatorIdx: ValidatorIndex, + epoch: Epoch): Opt[Eth1Address] = + let dynamicRecipient = vc.dynamicFeeRecipientsStore[].getDynamicFeeRecipient( + validatorIdx, epoch) + if dynamicRecipient.isSome(): + Opt.some(dynamicRecipient.get()) + else: + let staticRecipient = getSuggestedFeeRecipient( + vc.config.validatorsDir, pubkey, vc.config.defaultFeeRecipient) + if staticRecipient.isOk(): + Opt.some(staticRecipient.get()) + else: + Opt.none(Eth1Address) + +proc prepareProposersList*(vc: ValidatorClientRef, + epoch: Epoch): seq[PrepareBeaconProposer] = + var res: seq[PrepareBeaconProposer] + for validator in vc.attachedValidators[].items(): + if validator.index.isSome(): + let + index = validator.index.get() + feeRecipient = vc.getFeeRecipient(validator.pubkey, index, epoch) + if feeRecipient.isSome(): + res.add(PrepareBeaconProposer(validator_index: index, + fee_recipient: feeRecipient.get())) + res diff --git a/beacon_chain/validator_client/duties_service.nim b/beacon_chain/validator_client/duties_service.nim index b8d4b5d51..ee206d255 100644 --- a/beacon_chain/validator_client/duties_service.nim +++ b/beacon_chain/validator_client/duties_service.nim @@ -16,7 +16,8 @@ logScope: service = ServiceName type DutiesServiceLoop* = enum - AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop + AttesterLoop, ProposerLoop, IndicesLoop, SyncCommitteeLoop, + ProposerPreparationLoop chronicles.formatIt(DutiesServiceLoop): case it @@ -24,6 +25,7 @@ chronicles.formatIt(DutiesServiceLoop): of ProposerLoop: "proposer_loop" of IndicesLoop: "index_loop" of SyncCommitteeLoop: "sync_committee_loop" + of ProposerPreparationLoop: "proposer_prepare_loop" proc checkDuty(duty: RestAttesterDuty): bool = (duty.committee_length <= MAX_VALIDATORS_PER_COMMITTEE) and @@ -88,6 +90,7 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = missing.add(validatorLog(item.validator.pubkey, item.index)) else: validator.index = Opt.some(item.index) + validator.activationEpoch = Opt.some(item.validator.activation_epoch) updated.add(validatorLog(item.validator.pubkey, item.index)) list.add(validator) @@ -96,6 +99,7 @@ proc pollForValidatorIndices*(vc: ValidatorClientRef) {.async.} = updated_validators = len(updated) trace "Validator indices update dump", missing_validators = missing, updated_validators = updated + vc.indicesAvailable.fire() vc.addDoppelganger(list) proc pollForAttesterDuties*(vc: ValidatorClientRef, @@ -229,6 +233,18 @@ proc pollForAttesterDuties*(vc: ValidatorClientRef, return len(addOrReplaceItems) +proc pruneSyncCommitteeDuties*(vc: ValidatorClientRef, slot: Slot) = + if slot.is_sync_committee_period(): + var newSyncCommitteeDuties: SyncCommitteeDutiesMap + let epoch = slot.epoch() + for key, item in vc.syncCommitteeDuties: + var currentPeriodDuties = EpochSyncDuties() + for epochKey, epochDuty in item.duties: + if epochKey >= epoch: + currentPeriodDuties.duties[epochKey] = epochDuty + newSyncCommitteeDuties[key] = currentPeriodDuties + vc.syncCommitteeDuties = newSyncCommitteeDuties + proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, epoch: Epoch): Future[int] {.async.} = let validatorIndices = toSeq(vc.attachedValidators[].indices()) @@ -268,7 +284,8 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, block: var res: seq[SyncCommitteeDuty] for duty in filteredDuties: - for validatorSyncCommitteeIndex in duty.validator_sync_committee_indices: + for validatorSyncCommitteeIndex in + duty.validator_sync_committee_indices: res.add(SyncCommitteeDuty( pubkey: duty.pubkey, validator_index: duty.validator_index, @@ -281,12 +298,20 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, let addOrReplaceItems = block: + var alreadyWarned = false var res: seq[tuple[epoch: Epoch, duty: SyncCommitteeDuty]] for duty in relevantDuties: let map = vc.syncCommitteeDuties.getOrDefault(duty.pubkey) let epochDuty = map.duties.getOrDefault(epoch, DefaultSyncDutyAndProof) - info "Received new sync committee duty", duty, epoch - res.add((epoch, duty)) + if epochDuty.isDefault(): + info "Received new sync committee duty", duty, epoch + res.add((epoch, duty)) + else: + if epochDuty.data != duty: + if not(alreadyWarned): + info "Sync committee duties re-organization", duty, epoch + alreadyWarned = true + res.add((epoch, duty)) res if len(addOrReplaceItems) > 0: @@ -332,7 +357,8 @@ proc pollForSyncCommitteeDuties*(vc: ValidatorClientRef, SyncDutyAndProof.init(item.epoch, item.duty, none[ValidatorSig]()) - var validatorDuties = vc.syncCommitteeDuties.getOrDefault(item.duty.pubkey) + var validatorDuties = + vc.syncCommitteeDuties.getOrDefault(item.duty.pubkey) validatorDuties.duties[item.epoch] = dap vc.syncCommitteeDuties[item.duty.pubkey] = validatorDuties @@ -448,6 +474,8 @@ proc pollForSyncCommitteeDuties* (vc: ValidatorClientRef) {.async.} = if not(res): error "Failed to subscribe validators" + vc.pruneSyncCommitteeDuties(currentSlot) + proc pruneBeaconProposers(vc: ValidatorClientRef, epoch: Epoch) = var proposers: ProposerMap for epochKey, data in vc.proposers: @@ -492,6 +520,37 @@ proc pollForBeaconProposers*(vc: ValidatorClientRef) {.async.} = vc.pruneBeaconProposers(currentEpoch) +proc prepareBeaconProposers*(service: DutiesServiceRef) {.async.} = + let vc = service.client + let sres = vc.getCurrentSlot() + if sres.isSome(): + let + currentSlot = sres.get() + currentEpoch = currentSlot.epoch() + proposers = vc.prepareProposersList(currentEpoch) + + if len(proposers) > 0: + let count = + try: + await prepareBeaconProposer(vc, proposers) + except ValidatorApiError as exc: + warn "Unable to prepare beacon proposers", slot = currentSlot, + epoch = currentEpoch, err_name = exc.name, + err_msg = exc.msg + 0 + except CancelledError as exc: + debug "Beacon proposer preparation processing was interrupted" + raise exc + except CatchableError as exc: + error "Unexpected error occured while preparing beacon proposers", + slot = currentSlot, epoch = currentEpoch, err_name = exc.name, + err_msg = exc.msg + 0 + debug "Beacon proposers prepared", + validators_count = vc.attachedValidators[].count(), + proposers_count = len(proposers), + prepared_count = count + proc waitForNextSlot(service: DutiesServiceRef, serviceLoop: DutiesServiceLoop) {.async.} = let vc = service.client @@ -524,7 +583,15 @@ proc validatorIndexLoop(service: DutiesServiceRef) {.async.} = await vc.pollForValidatorIndices() await service.waitForNextSlot(IndicesLoop) -proc syncCommitteeeDutiesLoop(service: DutiesServiceRef) {.async.} = +proc proposerPreparationsLoop(service: DutiesServiceRef) {.async.} = + let vc = service.client + debug "Beacon proposer preparation loop waiting for validator indices update" + await vc.indicesAvailable.wait() + while true: + await service.prepareBeaconProposers() + await service.waitForNextSlot(ProposerPreparationLoop) + +proc syncCommitteeDutiesLoop(service: DutiesServiceRef) {.async.} = let vc = service.client debug "Sync committee duties loop waiting for fork schedule update" @@ -556,7 +623,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = attestFut = service.attesterDutiesLoop() proposeFut = service.proposerDutiesLoop() indicesFut = service.validatorIndexLoop() - syncFut = service.syncCommitteeeDutiesLoop() + syncFut = service.syncCommitteeDutiesLoop() + prepareFut = service.proposerPreparationsLoop() while true: # This loop could look much more nicer/better, when @@ -564,12 +632,15 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = # become safe to combine loops, breaks and exception handlers. let breakLoop = try: - discard await race(attestFut, proposeFut, indicesFut, syncFut) + discard await race(attestFut, proposeFut, indicesFut, syncFut, + prepareFut) checkAndRestart(AttesterLoop, attestFut, service.attesterDutiesLoop()) checkAndRestart(ProposerLoop, proposeFut, service.proposerDutiesLoop()) checkAndRestart(IndicesLoop, indicesFut, service.validatorIndexLoop()) - checkAndRestart(SyncCommitteeLoop, - syncFut, service.syncCommitteeeDutiesLoop()) + checkAndRestart(SyncCommitteeLoop, syncFut, + service.syncCommitteeDutiesLoop()) + checkAndRestart(ProposerPreparationLoop, prepareFut, + service.proposerPreparationsLoop()) false except CancelledError: debug "Service interrupted" @@ -582,6 +653,8 @@ proc mainLoop(service: DutiesServiceRef) {.async.} = pending.add(indicesFut.cancelAndWait()) if not(syncFut.finished()): pending.add(syncFut.cancelAndWait()) + if not(prepareFut.finished()): + pending.add(prepareFut.cancelAndWait()) await allFutures(pending) true except CatchableError as exc: diff --git a/beacon_chain/validator_client/sync_committee_service.nim b/beacon_chain/validator_client/sync_committee_service.nim index a697dde3c..945503ae2 100644 --- a/beacon_chain/validator_client/sync_committee_service.nim +++ b/beacon_chain/validator_client/sync_committee_service.nim @@ -360,10 +360,9 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef, await service.produceAndPublishContributions(slot, beaconBlockRoot, duties) proc spawnSyncCommitteeTasks(service: SyncCommitteeServiceRef, slot: Slot) = - let vc = service.client - - removeOldSyncPeriodDuties(vc, slot) - let duties = vc.getSyncCommitteeDutiesForSlot(slot + 1) + let + vc = service.client + duties = vc.getSyncCommitteeDutiesForSlot(slot + 1) asyncSpawn service.publishSyncMessagesAndContributions(slot, duties) diff --git a/beacon_chain/validators/validator_pool.nim b/beacon_chain/validators/validator_pool.nim index 2daf1c9b4..327c3993a 100644 --- a/beacon_chain/validators/validator_pool.nim +++ b/beacon_chain/validators/validator_pool.nim @@ -55,6 +55,9 @@ type # assumed that a valid index is stored here! index*: Opt[ValidatorIndex] + # Epoch when validator activated. + activationEpoch*: Opt[Epoch] + # Cache the latest slot signature - the slot signature is used to determine # if the validator will be aggregating (in the near future) slotSignature*: Opt[tuple[slot: Slot, signature: ValidatorSig]]