# beacon_chain # Copyright (c) 2021-2023 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. import chronicles, ".."/validators/activity_metrics, ".."/spec/forks, common, api const ServiceName = "block_service" logScope: service = ServiceName type PreparedBeaconBlock = object blockRoot*: Eth2Digest data*: ForkedBeaconBlock PreparedBlindedBeaconBlock = object blockRoot*: Eth2Digest data*: ForkedBlindedBeaconBlock proc produceBlock( vc: ValidatorClientRef, currentSlot, slot: Slot, randao_reveal: ValidatorSig, graffiti: GraffitiBytes, validator: AttachedValidator ): Future[Opt[PreparedBeaconBlock]] {.async.} = logScope: slot = slot wall_slot = currentSlot validator = shortLog(validator) let beaconBlock = try: await vc.produceBlockV2(slot, randao_reveal, graffiti, ApiStrategyKind.Best) except ValidatorApiError as exc: warn "Unable to retrieve block data", reason = exc.getFailureReason() return Opt.none(PreparedBeaconBlock) except CancelledError as exc: debug "Block data production has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while getting block data", error_name = exc.name, error_msg = exc.msg return Opt.none(PreparedBeaconBlock) blockRoot = withBlck(beaconBlock): hash_tree_root(blck) return Opt.some(PreparedBeaconBlock(blockRoot: blockRoot, data: beaconBlock)) proc produceBlindedBlock( vc: ValidatorClientRef, currentSlot, slot: Slot, randao_reveal: ValidatorSig, graffiti: GraffitiBytes, validator: AttachedValidator ): Future[Opt[PreparedBlindedBeaconBlock]] {.async.} = logScope: slot = slot wall_slot = currentSlot validator = shortLog(validator) let beaconBlock = try: await vc.produceBlindedBlock(slot, randao_reveal, graffiti, ApiStrategyKind.Best) except ValidatorApiError as exc: warn "Unable to retrieve blinded block data", error_msg = exc.msg, reason = exc.getFailureReason() return Opt.none(PreparedBlindedBeaconBlock) except CancelledError as exc: debug "Blinded block data production has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while getting blinded block data", error_name = exc.name, error_msg = exc.msg return Opt.none(PreparedBlindedBeaconBlock) blockRoot = withBlck(beaconBlock): hash_tree_root(blck) return Opt.some( PreparedBlindedBeaconBlock(blockRoot: blockRoot, data: beaconBlock)) proc lazyWait[T](fut: Future[T]) {.async.} = try: discard await fut except CatchableError: discard proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot, validator: AttachedValidator) {.async.} = let genesisRoot = vc.beaconGenesis.genesis_validators_root graffiti = if vc.config.graffiti.isSome(): vc.config.graffiti.get() else: defaultGraffitiBytes() fork = vc.forkAtEpoch(slot.epoch) vindex = validator.index.get() logScope: validator = shortLog(validator) validator_index = vindex slot = slot wall_slot = currentSlot debug "Publishing block", delay = vc.getDelay(slot.block_deadline()), genesis_root = genesisRoot, graffiti = graffiti, fork = fork let randaoReveal = try: let res = await validator.getEpochSignature(fork, genesisRoot, slot.epoch) if res.isErr(): warn "Unable to generate randao reveal using remote signer", reason = res.error() return res.get() except CancelledError as exc: debug "Randao reveal production has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while receiving randao data", error_name = exc.name, error_msg = exc.msg return var beaconBlocks = block: let blindedBlockFut = if vc.config.payloadBuilderEnable: vc.produceBlindedBlock(currentSlot, slot, randaoReveal, graffiti, validator) else: nil let normalBlockFut = vc.produceBlock(currentSlot, slot, randaoReveal, graffiti, validator) let blindedBlock = if isNil(blindedBlockFut): Opt.none(PreparedBlindedBeaconBlock) else: try: await blindedBlockFut except CancelledError as exc: if not(normalBlockFut.finished()): await normalBlockFut.cancelAndWait() raise exc except CatchableError as exc: # This should not be happened, because all the exceptions handled. Opt.none(PreparedBlindedBeaconBlock) let normalBlock = if blindedBlock.isNone(): try: await normalBlockFut except CancelledError as exc: raise exc except CatchableError as exc: # This should not be happened, because all the exceptions handled. Opt.none(PreparedBeaconBlock) else: if not(normalBlockFut.finished()): asyncSpawn lazyWait(normalBlockFut) Opt.none(PreparedBeaconBlock) if blindedBlock.isNone() and normalBlock.isNone(): return (blindedBlock: blindedBlock, normalBlock: normalBlock) if beaconBlocks.blindedBlock.isSome(): let preparedBlock = beaconBlocks.blindedBlock.get() signingRoot = compute_block_signing_root(fork, genesisRoot, slot, preparedBlock.blockRoot) notSlashable = vc.attachedValidators[] .slashingProtection .registerBlock(vindex, validator.pubkey, slot, signingRoot) logScope: blck = shortLog(preparedBlock.data) block_root = shortLog(preparedBlock.blockRoot) signing_root = shortLog(signingRoot) if notSlashable.isOk(): let signature = try: let res = await validator.getBlockSignature(fork, genesisRoot, slot, preparedBlock.blockRoot, preparedBlock.data) if res.isErr(): warn "Unable to sign blinded block proposal using remote signer", reason = res.error() return res.get() except CancelledError as exc: debug "Blinded block signature process has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while signing blinded block", error_name = exc.name, error_msg = exc.msg return logScope: signature = shortLog(signature) let signedBlock = ForkedSignedBlindedBeaconBlock.init(preparedBlock.data, preparedBlock.blockRoot, signature) res = try: debug "Sending blinded block" await vc.publishBlindedBlock(signedBlock, ApiStrategyKind.First) except ValidatorApiError as exc: warn "Unable to publish blinded block", reason = exc.getFailureReason() return except CancelledError as exc: debug "Blinded block publication has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while publishing blinded block", error_name = exc.name, error_msg = exc.msg return if res: let delay = vc.getDelay(slot.block_deadline()) beacon_blocks_sent.inc() beacon_blocks_sent_delay.observe(delay.toFloatSeconds()) notice "Blinded block published", delay = delay else: warn "Blinded block was not accepted by beacon node" else: warn "Slashing protection activated for blinded block proposal" else: let preparedBlock = beaconBlocks.normalBlock.get() signingRoot = compute_block_signing_root(fork, genesisRoot, slot, preparedBlock.blockRoot) notSlashable = vc.attachedValidators[] .slashingProtection .registerBlock(vindex, validator.pubkey, slot, signingRoot) logScope: blck = shortLog(preparedBlock.data) block_root = shortLog(preparedBlock.blockRoot) signing_root = shortLog(signingRoot) if notSlashable.isOk(): let signature = try: let res = await validator.getBlockSignature(fork, genesisRoot, slot, preparedBlock.blockRoot, preparedBlock.data) if res.isErr(): warn "Unable to sign block proposal using remote signer", reason = res.error() return res.get() except CancelledError as exc: debug "Block signature process has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while signing block", error_name = exc.name, error_msg = exc.msg return signedBlock = ForkedSignedBeaconBlock.init(preparedBlock.data, preparedBlock.blockRoot, signature) res = try: debug "Sending block" await vc.publishBlock(signedBlock, ApiStrategyKind.First) except ValidatorApiError as exc: warn "Unable to publish block", reason = exc.getFailureReason() return except CancelledError as exc: debug "Block publication has been interrupted" raise exc except CatchableError as exc: error "An unexpected error occurred while publishing block", error_name = exc.name, error_msg = exc.msg return if res: let delay = vc.getDelay(slot.block_deadline()) beacon_blocks_sent.inc() beacon_blocks_sent_delay.observe(delay.toFloatSeconds()) notice "Block published", delay = delay else: warn "Block was not accepted by beacon node" else: warn "Slashing protection activated for block proposal" proc proposeBlock(vc: ValidatorClientRef, slot: Slot, proposerKey: ValidatorPubKey) {.async.} = let currentSlot = (await vc.checkedWaitForSlot(slot, ZeroTimeDiff, false)).valueOr: error "Unable to perform block production because of system time" return if currentSlot > slot: warn "Skip block production for expired slot", current_slot = currentSlot, duties_slot = slot return let validator = vc.getValidatorForDuties(proposerKey, slot).valueOr: return try: await vc.publishBlock(currentSlot, slot, validator) except CancelledError as exc: debug "Block proposing process was interrupted", slot = slot, validator = shortLog(proposerKey) raise exc except CatchableError as exc: error "Unexpected error encountered while proposing block", slot = slot, validator = shortLog(validator) proc spawnProposalTask(vc: ValidatorClientRef, duty: RestProposerDuty): ProposerTask = let future = proposeBlock(vc, duty.slot, duty.pubkey) ProposerTask(future: future, duty: duty) proc contains(data: openArray[RestProposerDuty], task: ProposerTask): bool = for item in data: if (item.pubkey == task.duty.pubkey) and (item.slot == task.duty.slot): return true false proc contains(data: openArray[ProposerTask], duty: RestProposerDuty): bool = for item in data: if (item.duty.pubkey == duty.pubkey) and (item.duty.slot == duty.slot): return true false proc checkDuty(duty: RestProposerDuty, epoch: Epoch, slot: Slot): bool = let lastSlot = start_slot(epoch + 1'u64) if duty.slot >= slot: if duty.slot < lastSlot: true else: warn "Block proposal duty is in the far future, ignoring", duty_slot = duty.slot, validator = shortLog(duty.pubkey), wall_slot = slot, last_slot_in_epoch = (lastSlot - 1'u64) false else: warn "Block proposal duty is in the past, ignoring", duty_slot = duty.slot, validator = shortLog(duty.pubkey), wall_slot = slot false proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch, dependentRoot: Eth2Digest, duties: openArray[RestProposerDuty]) = let default = ProposedData(epoch: FAR_FUTURE_EPOCH) currentSlot = vc.getCurrentSlot().get(Slot(0)) epochDuties = vc.proposers.getOrDefault(epoch, default) if not(epochDuties.isDefault()): if epochDuties.dependentRoot != dependentRoot: warn "Proposer duties re-organization", duties_count = len(duties), wall_slot = currentSlot, epoch = epoch, prior_dependent_root = epochDuties.dependentRoot, dependent_root = dependentRoot let tasks = block: var res: seq[ProposerTask] var hashset = initHashSet[Slot]() for task in epochDuties.duties: if task notin duties: # Task is no more relevant, so cancel it. debug "Cancelling running proposal duty task", slot = task.duty.slot, validator = shortLog(task.duty.pubkey) task.future.cancel() else: # If task is already running for proper slot, we keep it alive. debug "Keep running previous proposal duty task", slot = task.duty.slot, validator = shortLog(task.duty.pubkey) res.add(task) for duty in duties: if duty notin res: debug "New proposal duty received", slot = duty.slot, validator = shortLog(duty.pubkey) if checkDuty(duty, epoch, currentSlot): let task = vc.spawnProposalTask(duty) if duty.slot in hashset: error "Multiple block proposers for this slot, " & "producing blocks for all proposers", slot = duty.slot else: hashset.incl(duty.slot) res.add(task) res vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks) else: debug "New block proposal duties received", dependent_root = dependentRoot, duties_count = len(duties), wall_slot = currentSlot, epoch = epoch # Spawn new proposer tasks and modify proposers map. let tasks = block: var hashset = initHashSet[Slot]() var res: seq[ProposerTask] for duty in duties: debug "New proposal duty received", slot = duty.slot, validator = shortLog(duty.pubkey) if checkDuty(duty, epoch, currentSlot): let task = vc.spawnProposalTask(duty) if duty.slot in hashset: error "Multiple block proposers for this slot, " & "producing blocks for all proposers", slot = duty.slot else: hashset.incl(duty.slot) res.add(task) res vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks) proc waitForBlockPublished*(vc: ValidatorClientRef, slot: Slot, timediff: TimeDiff) {.async.} = ## This procedure will wait for all the block proposal tasks to be finished at ## slot ``slot``. let startTime = Moment.now() pendingTasks = block: var res: seq[Future[void]] let epochDuties = vc.proposers.getOrDefault(slot.epoch()) for task in epochDuties.duties: if task.duty.slot == slot: if not(task.future.finished()): res.add(task.future) res waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now() logScope: start_time = startTime pending_tasks = len(pendingTasks) slot = slot timediff = timediff # TODO (cheatfate): This algorithm should be tuned, when we will have ability # to monitor block proposals which are not created by validators bundled with # VC. logScope: wait_time = waitTime if waitTime.nanoseconds > 0'i64: if len(pendingTasks) > 0: # Block proposal pending try: await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds)) trace "Block proposal awaited" # The expected block arrived - in our async loop however, we might # have been doing other processing that caused delays here so we'll # cap the waiting to the time when we would have sent out attestations # had the block not arrived. An opposite case is that we received # (or produced) a block that has not yet reached our neighbours. To # protect against our attestations being dropped (because the others # have not yet seen the block), we'll impose a minimum delay of # 2000ms. The delay is enforced only when we're not hitting the # "normal" cutoff time for sending out attestations. An earlier delay # of 250ms has proven to be not enough, increasing the risk of losing # attestations, and with growing block sizes, 1000ms started to be # risky as well. Regardless, because we "just" received the block, # we'll impose the delay. # Take into consideration chains with a different slot time const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2) let afterBlockTime = vc.beaconClock.now() + afterBlockDelay afterBlockCutoff = vc.beaconClock.fromNow( min(afterBlockTime, slot.attestation_deadline() + afterBlockDelay)) if afterBlockCutoff.inFuture: debug "Got block, waiting to send attestations", after_block_cutoff = shortLog(afterBlockCutoff.offset) await sleepAsync(afterBlockCutoff.offset) except CancelledError as exc: let dur = Moment.now() - startTime debug "Waiting for block publication interrupted", duration = dur raise exc except AsyncTimeoutError: let dur = Moment.now() - startTime debug "Block was not published in time", duration = dur else: # No pending block proposals. try: await sleepAsync(nanoseconds(waitTime.nanoseconds)) except CancelledError as exc: let dur = Moment.now() - startTime debug "Waiting for block publication interrupted", duration = dur raise exc except CatchableError as exc: let dur = Moment.now() - startTime error "Unexpected error occured while waiting for block publication", err_name = exc.name, err_msg = exc.msg, duration = dur return proc mainLoop(service: BlockServiceRef) {.async.} = let vc = service.client service.state = ServiceState.Running debug "Service started" var future = newFuture[void]() try: # Future is not going to be completed, so the only way to exit, is to # cancel it. await future except CancelledError as exc: debug "Service interrupted" except CatchableError as exc: error "Service crashed with unexpected error", err_name = exc.name, err_msg = exc.msg # We going to cleanup all the pending proposer tasks. var res: seq[Future[void]] for epoch, data in vc.proposers.pairs(): for duty in data.duties.items(): if not(duty.future.finished()): res.add(duty.future.cancelAndWait()) await allFutures(res) proc init*(t: typedesc[BlockServiceRef], vc: ValidatorClientRef): Future[BlockServiceRef] {.async.} = logScope: service = ServiceName var res = BlockServiceRef(name: ServiceName, client: vc, state: ServiceState.Initialized) debug "Initializing service" return res proc start*(service: BlockServiceRef) = service.lifeFut = mainLoop(service)