VC: Obtain randao signature before slot proposal. (#5490)

* Randao calculation caching for VC implementation.

* Add time monitoring for randao signatures process.

* Add delay calculation.

* Address review comments.

* Address review comments.
This commit is contained in:
Eugene Kabanov 2023-11-04 09:14:14 +02:00 committed by GitHub
parent 29fe958908
commit 8cec3af61c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 27 deletions

View File

@ -32,6 +32,9 @@ type
blockRoot*: Eth2Digest blockRoot*: Eth2Digest
data*: ForkedBlindedBeaconBlock data*: ForkedBlindedBeaconBlock
proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
proposerKey: ValidatorPubKey) {.async.}
proc produceBlock( proc produceBlock(
vc: ValidatorClientRef, vc: ValidatorClientRef,
currentSlot, slot: Slot, currentSlot, slot: Slot,
@ -86,7 +89,6 @@ proc produceBlock(
data: ForkedBeaconBlock.init(blck), data: ForkedBeaconBlock.init(blck),
blobsOpt: Opt.some(blobs))) blobsOpt: Opt.some(blobs)))
proc produceBlindedBlock( proc produceBlindedBlock(
vc: ValidatorClientRef, vc: ValidatorClientRef,
currentSlot, slot: Slot, currentSlot, slot: Slot,
@ -125,6 +127,58 @@ proc lazyWait[T](fut: Future[T]) {.async.} =
except CatchableError: except CatchableError:
discard discard
proc prepareRandao(vc: ValidatorClientRef, slot: Slot,
proposerKey: ValidatorPubKey) {.async.} =
if slot == GENESIS_SLOT:
return
let
destSlot = slot - 1'u64
destOffset = TimeDiff(nanoseconds: NANOSECONDS_PER_SLOT.int64 div 2)
deadline = destSlot.start_beacon_time() + destOffset
epoch = slot.epoch()
# We going to wait to T - (T / 4 * 2), where T is proposer's
# duty slot.
currentSlot = (await vc.checkedWaitForSlot(destSlot, destOffset,
false)).valueOr:
debug "Unable to perform RANDAO signature preparation because of " &
"system time failure"
return
validator =
vc.getValidatorForDuties(proposerKey, slot, true).valueOr: return
if currentSlot <= destSlot:
# We do not need result, because we want it to be cached.
let
start = Moment.now()
genesisRoot = vc.beaconGenesis.genesis_validators_root
fork = vc.forkAtEpoch(epoch)
rsig = await validator.getEpochSignature(fork, genesisRoot, epoch)
timeElapsed = Moment.now() - start
if rsig.isErr():
debug "Unable to prepare RANDAO signature", epoch = epoch,
validator = shortLog(validator), elapsed_time = timeElapsed,
current_slot = currentSlot, destination_slot = destSlot,
delay = vc.getDelay(deadline)
else:
debug "RANDAO signature has been prepared", epoch = epoch,
validator = shortLog(validator), elapsed_time = timeElapsed,
current_slot = currentSlot, destination_slot = destSlot,
delay = vc.getDelay(deadline)
else:
debug "RANDAO signature preparation timed out", epoch = epoch,
validator = shortLog(validator),
current_slot = currentSlot, destination_slot = destSlot,
delay = vc.getDelay(deadline)
proc spawnProposalTask(vc: ValidatorClientRef,
duty: RestProposerDuty): ProposerTask =
ProposerTask(
randaoFut: prepareRandao(vc, duty.slot, duty.pubkey),
proposeFut: proposeBlock(vc, duty.slot, duty.pubkey),
duty: duty
)
proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
validator: AttachedValidator) {.async.} = validator: AttachedValidator) {.async.} =
let let
@ -146,19 +200,20 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
debug "Publishing block", delay = vc.getDelay(slot.block_deadline()), debug "Publishing block", delay = vc.getDelay(slot.block_deadline()),
genesis_root = genesisRoot, genesis_root = genesisRoot,
graffiti = graffiti, fork = fork graffiti = graffiti, fork = fork
let randaoReveal =
let
randaoReveal =
try: try:
let res = await validator.getEpochSignature(fork, genesisRoot, slot.epoch) (await validator.getEpochSignature(fork, genesisRoot,
if res.isErr(): slot.epoch())).valueOr:
warn "Unable to generate randao reveal using remote signer", warn "Unable to generate RANDAO reveal using remote signer",
reason = res.error() reason = error
return return
res.get()
except CancelledError as exc: except CancelledError as exc:
debug "Randao reveal production has been interrupted" debug "RANDAO reveal production has been interrupted"
raise exc raise exc
except CatchableError as exc: except CatchableError as exc:
error "An unexpected error occurred while receiving randao data", error "An unexpected error occurred while receiving RANDAO data",
error_name = exc.name, error_msg = exc.msg error_name = exc.name, error_msg = exc.msg
return return
@ -408,11 +463,6 @@ proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
error "Unexpected error encountered while proposing block", error "Unexpected error encountered while proposing block",
slot = slot, validator = shortLog(validator) slot = slot, validator = shortLog(validator)
proc spawnProposalTask(vc: ValidatorClientRef,
duty: RestProposerDuty): ProposerTask =
let future = proposeBlock(vc, duty.slot, duty.pubkey)
ProposerTask(future: future, duty: duty)
proc contains(data: openArray[RestProposerDuty], task: ProposerTask): bool = proc contains(data: openArray[RestProposerDuty], task: ProposerTask): bool =
for item in data: for item in data:
if (item.pubkey == task.duty.pubkey) and (item.slot == task.duty.slot): if (item.pubkey == task.duty.pubkey) and (item.slot == task.duty.slot):
@ -462,13 +512,14 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
for task in epochDuties.duties: for task in epochDuties.duties:
if task notin duties: if task notin duties:
# Task is no more relevant, so cancel it. # Task is no more relevant, so cancel it.
debug "Cancelling running proposal duty task", debug "Cancelling running proposal duty tasks",
slot = task.duty.slot, slot = task.duty.slot,
validator = shortLog(task.duty.pubkey) validator = shortLog(task.duty.pubkey)
task.future.cancelSoon() task.proposeFut.cancelSoon()
task.randaoFut.cancelSoon()
else: else:
# If task is already running for proper slot, we keep it alive. # If task is already running for proper slot, we keep it alive.
debug "Keep running previous proposal duty task", debug "Keep running previous proposal duty tasks",
slot = task.duty.slot, slot = task.duty.slot,
validator = shortLog(task.duty.pubkey) validator = shortLog(task.duty.pubkey)
res.add(task) res.add(task)
@ -783,8 +834,10 @@ proc mainLoop(service: BlockServiceRef) {.async.} =
var res: seq[FutureBase] var res: seq[FutureBase]
for epoch, data in vc.proposers.pairs(): for epoch, data in vc.proposers.pairs():
for duty in data.duties.items(): for duty in data.duties.items():
if not(duty.future.finished()): if not(duty.proposeFut.finished()):
res.add(duty.future.cancelAndWait()) res.add(duty.proposeFut.cancelAndWait())
if not(duty.randaoFut.finished()):
res.add(duty.randaoFut.cancelAndWait())
await noCancel allFutures(res) await noCancel allFutures(res)
proc init*(t: typedesc[BlockServiceRef], proc init*(t: typedesc[BlockServiceRef],

View File

@ -95,7 +95,8 @@ type
ProposerTask* = object ProposerTask* = object
duty*: RestProposerDuty duty*: RestProposerDuty
future*: Future[void] proposeFut*: Future[void]
randaoFut*: Future[void]
ProposedData* = object ProposedData* = object
epoch*: Epoch epoch*: Epoch

View File

@ -66,6 +66,10 @@ type
# if the validator will be aggregating (in the near future) # if the validator will be aggregating (in the near future)
slotSignature*: Opt[tuple[slot: Slot, signature: ValidatorSig]] slotSignature*: Opt[tuple[slot: Slot, signature: ValidatorSig]]
# Cache the latest epoch signature - the epoch signature is used for block
# proposing.
epochSignature*: Opt[tuple[epoch: Epoch, signature: ValidatorSig]]
# For the external payload builder; each epoch, the external payload # For the external payload builder; each epoch, the external payload
# builder should be informed of current validators # builder should be informed of current validators
externalBuilderRegistration*: Opt[SignedValidatorRegistrationV1] externalBuilderRegistration*: Opt[SignedValidatorRegistrationV1]
@ -746,7 +750,10 @@ proc getContributionAndProofSignature*(v: AttachedValidator, fork: Fork,
proc getEpochSignature*(v: AttachedValidator, fork: Fork, proc getEpochSignature*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, epoch: Epoch genesis_validators_root: Eth2Digest, epoch: Epoch
): Future[SignatureResult] {.async.} = ): Future[SignatureResult] {.async.} =
return if v.epochSignature.isSome and v.epochSignature.get.epoch == epoch:
return SignatureResult.ok(v.epochSignature.get.signature)
let signature =
case v.kind case v.kind
of ValidatorKind.Local: of ValidatorKind.Local:
SignatureResult.ok(get_epoch_signature( SignatureResult.ok(get_epoch_signature(
@ -757,6 +764,12 @@ proc getEpochSignature*(v: AttachedValidator, fork: Fork,
fork, genesis_validators_root, epoch) fork, genesis_validators_root, epoch)
await v.signData(request) await v.signData(request)
if signature.isErr:
return signature
v.epochSignature = Opt.some((epoch, signature.get))
signature
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/phase0/validator.md#aggregation-selection # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.1/specs/phase0/validator.md#aggregation-selection
proc getSlotSignature*(v: AttachedValidator, fork: Fork, proc getSlotSignature*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot genesis_validators_root: Eth2Digest, slot: Slot