VC: Remote BN received block monitoring. (#4856)

* Initial commit with both methods enabled: `poll` and `event`.

* Address review comments.

* Address review comments.
Fix copyright years.

* After bump fixes.
This commit is contained in:
Eugene Kabanov 2023-06-08 11:44:32 +03:00 committed by GitHub
parent e8c6af0636
commit effe8b7f90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 539 additions and 114 deletions

View File

@ -120,6 +120,11 @@ type
Normal = "normal"
SingleSalt = "single-salt"
BlockMonitoringType* {.pure.} = enum
Disabled = "disabled"
Poll = "poll"
Event = "event"
BeaconNodeConf* = object
configFile* {.
desc: "Loads the configuration from a TOML file"
@ -944,6 +949,11 @@ type
defaultValueDesc: $defaultBeaconNodeUri
name: "beacon-node" .}: seq[Uri]
monitoringType* {.
desc: "Enable block monitoring which are seen by beacon node (BETA)"
defaultValue: BlockMonitoringType.Disabled
name: "block-monitor-type".}: BlockMonitoringType
SigningNodeConf* = object
configFile* {.
desc: "Loads the configuration from a TOML file"

View File

@ -3150,6 +3150,31 @@ proc decodeString*(t: typedesc[EventTopic],
else:
err("Incorrect event's topic value")
proc encodeString*(value: set[EventTopic]): Result[string, cstring] =
var res: string
if EventTopic.Head in value:
res.add("head,")
if EventTopic.Block in value:
res.add("block,")
if EventTopic.Attestation in value:
res.add("attestation,")
if EventTopic.VoluntaryExit in value:
res.add("voluntary_exit,")
if EventTopic.FinalizedCheckpoint in value:
res.add("finalized_checkpoint,")
if EventTopic.ChainReorg in value:
res.add("chain_reorg,")
if EventTopic.ContributionAndProof in value:
res.add("contribution_and_proof,")
if EventTopic.LightClientFinalityUpdate in value:
res.add("light_client_finality_update,")
if EventTopic.LightClientOptimisticUpdate in value:
res.add("light_client_optimistic_update,")
if len(res) == 0:
return err("Topics set must not be empty")
res.setLen(len(res) - 1)
ok(res)
proc decodeString*(t: typedesc[ValidatorSig],
value: string): Result[ValidatorSig, cstring] =
if len(value) != ValidatorSigSize + 2:
@ -3353,3 +3378,13 @@ proc decodeString*(t: typedesc[ConsensusFork],
of "capella": ok(ConsensusFork.Capella)
of "deneb": ok(ConsensusFork.Deneb)
else: err("Unsupported or invalid beacon block fork version")
proc decodeString*(t: typedesc[EventBeaconBlockObject],
value: string): Result[EventBeaconBlockObject, string] =
try:
ok(RestJson.decode(value, t,
requireAllFields = true,
allowUnknownFields = true))
except SerializationError as exc:
err(exc.formatMsg("<data>"))

View File

@ -110,11 +110,43 @@ proc getBlockHeaders*(slot: Option[Slot], parent_root: Option[Eth2Digest]
meth: MethodGet.}
## https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeaders
proc getBlockHeader*(block_id: BlockIdent): RestResponse[GetBlockHeaderResponse] {.
# proc getBlockHeader*(block_id: BlockIdent): RestResponse[GetBlockHeaderResponse] {.
# rest, endpoint: "/eth/v1/beacon/headers/{block_id}",
# meth: MethodGet.}
# ## https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeader
proc getBlockHeaderPlain*(block_id: BlockIdent): RestPlainResponse {.
rest, endpoint: "/eth/v1/beacon/headers/{block_id}",
meth: MethodGet.}
## https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeader
proc getBlockHeader*(
client: RestClientRef,
block_id: BlockIdent
): Future[Opt[GetBlockHeaderResponse]] {.async.} =
## https://ethereum.github.io/beacon-APIs/#/Beacon/getBlockHeader
let resp = await client.getBlockHeaderPlain(block_id)
return
case resp.status
of 200:
let response = decodeBytes(GetBlockHeaderResponse, resp.data,
resp.contentType).valueOr:
raise newException(RestError, $error)
Opt.some(response)
of 404:
Opt.none(GetBlockHeaderResponse)
of 400, 500:
let error = decodeBytes(RestErrorMessage, resp.data,
resp.contentType).valueOr:
let msg = "Incorrect response error format (" & $resp.status &
") [" & $error & "]"
raise (ref RestResponseError)(msg: msg, status: resp.status)
let msg = "Error response (" & $resp.status & ") [" & error.message & "]"
raise (ref RestResponseError)(
msg: msg, status: error.code, message: error.message)
else:
raiseRestResponseError(resp)
proc publishBlock*(body: phase0.SignedBeaconBlock): RestPlainResponse {.
rest, endpoint: "/eth/v1/beacon/blocks",
meth: MethodPost.}

View File

@ -12,7 +12,7 @@ import
rest_beacon_calls, rest_builder_calls, rest_config_calls, rest_debug_calls,
rest_keymanager_calls, rest_light_client_calls,
rest_node_calls, rest_validator_calls,
rest_nimbus_calls, rest_common
rest_nimbus_calls, rest_event_calls, rest_common
]
export
@ -20,4 +20,4 @@ export
rest_beacon_calls, rest_builder_calls, rest_config_calls, rest_debug_calls,
rest_keymanager_calls, rest_light_client_calls,
rest_node_calls, rest_validator_calls,
rest_nimbus_calls, rest_common
rest_nimbus_calls, rest_event_calls, rest_common

View File

@ -0,0 +1,16 @@
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
chronos, presto/client,
"."/[rest_types, eth2_rest_serialization]
proc subscribeEventStream*(topics: set[EventTopic]): RestHttpResponseRef {.
rest, endpoint: "/eth/v1/events", accept: "text/event-stream",
meth: MethodGet.}
## https://ethereum.github.io/beacon-APIs/#/Events/eventstream

View File

@ -644,7 +644,7 @@ type
GetAggregatedAttestationResponse* = DataEnclosedObject[Attestation]
GetAttesterDutiesResponse* = DataRootEnclosedObject[seq[RestAttesterDuty]]
GetBlockAttestationsResponse* = DataEnclosedObject[seq[Attestation]]
GetBlockHeaderResponse* = DataEnclosedObject[RestBlockHeaderInfo]
GetBlockHeaderResponse* = DataOptimisticObject[RestBlockHeaderInfo]
GetBlockHeadersResponse* = DataEnclosedObject[seq[RestBlockHeaderInfo]]
GetBlockRootResponse* = DataOptimisticObject[RestRoot]
GetDebugChainHeadsResponse* = DataEnclosedObject[seq[RestChainHead]]

View File

@ -622,7 +622,7 @@ proc getIndexedErrorMessage(response: RestPlainResponse): string =
else:
"Unable to decode error response: [" & $res.error & "]"
proc getErrorMessage(response: RestPlainResponse): string =
proc getErrorMessage*(response: RestPlainResponse): string =
let res = decodeBytes(RestErrorMessage, response.data,
response.contentType)
if res.isOk():

View File

@ -9,7 +9,7 @@ import
std/sets,
chronicles,
../validators/activity_metrics,
"."/[common, api, block_service]
"."/[common, api]
const
ServiceName = "attestation_service"
@ -332,7 +332,7 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
duties: seq[DutyAndProof]) {.async.} =
let vc = service.client
# Waiting for blocks to be published before attesting.
await vc.waitForBlockPublished(slot, attestationSlotOffset)
await vc.waitForBlock(slot, attestationSlotOffset)
block:
let delay = vc.getDelay(slot.attestation_deadline())
@ -431,7 +431,7 @@ proc mainLoop(service: AttestationServiceRef) {.async.} =
try:
let
# We use zero offset here, because we do waiting in
# waitForBlockPublished(attestationSlotOffset).
# waitForBlock(attestationSlotOffset).
slot = await vc.checkedWaitForNextSlot(currentSlot,
ZeroTimeDiff, false)
if slot.isNone():

View File

@ -9,10 +9,14 @@ import
chronicles,
".."/validators/activity_metrics,
".."/spec/forks,
common, api
common, api, fallback_service
const
ServiceName = "block_service"
BlockPollInterval = attestationSlotOffset.nanoseconds div 4
BlockPollOffset1 = TimeDiff(nanoseconds: BlockPollInterval)
BlockPollOffset2 = TimeDiff(nanoseconds: BlockPollInterval * 2)
BlockPollOffset3 = TimeDiff(nanoseconds: BlockPollInterval * 3)
logScope: service = ServiceName
@ -428,90 +432,263 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
proc waitForBlockPublished*(vc: ValidatorClientRef,
slot: Slot, timediff: TimeDiff) {.async.} =
## This procedure will wait for all the block proposal tasks to be finished at
## slot ``slot``.
let
startTime = Moment.now()
pendingTasks =
block:
var res: seq[Future[void]]
let epochDuties = vc.proposers.getOrDefault(slot.epoch())
for task in epochDuties.duties:
if task.duty.slot == slot:
if not(task.future.finished()):
res.add(task.future)
res
waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
proc pollForEvents(service: BlockServiceRef, node: BeaconNodeServerRef,
response: RestHttpResponseRef) {.async.} =
let vc = service.client
logScope:
start_time = startTime
pending_tasks = len(pendingTasks)
slot = slot
timediff = timediff
node = node
# TODO (cheatfate): This algorithm should be tuned, when we will have ability
# to monitor block proposals which are not created by validators bundled with
# VC.
logScope: wait_time = waitTime
if waitTime.nanoseconds > 0'i64:
if len(pendingTasks) > 0:
# Block proposal pending
try:
await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds))
trace "Block proposal awaited"
# The expected block arrived - in our async loop however, we might
# have been doing other processing that caused delays here so we'll
# cap the waiting to the time when we would have sent out attestations
# had the block not arrived. An opposite case is that we received
# (or produced) a block that has not yet reached our neighbours. To
# protect against our attestations being dropped (because the others
# have not yet seen the block), we'll impose a minimum delay of
# 2000ms. The delay is enforced only when we're not hitting the
# "normal" cutoff time for sending out attestations. An earlier delay
# of 250ms has proven to be not enough, increasing the risk of losing
# attestations, and with growing block sizes, 1000ms started to be
# risky as well. Regardless, because we "just" received the block,
# we'll impose the delay.
# Take into consideration chains with a different slot time
const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2)
let
afterBlockTime = vc.beaconClock.now() + afterBlockDelay
afterBlockCutoff = vc.beaconClock.fromNow(
min(afterBlockTime,
slot.attestation_deadline() + afterBlockDelay))
if afterBlockCutoff.inFuture:
debug "Got block, waiting to send attestations",
after_block_cutoff = shortLog(afterBlockCutoff.offset)
await sleepAsync(afterBlockCutoff.offset)
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not published in time", duration = dur
else:
# No pending block proposals.
while true:
let events =
try:
await sleepAsync(nanoseconds(waitTime.nanoseconds))
await response.getServerSentEvents()
except RestError as exc:
debug "Unable to receive server-sent event", reason = $exc.msg
return
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except CatchableError as exc:
let dur = Moment.now() - startTime
error "Unexpected error occured while waiting for block publication",
err_name = exc.name, err_msg = exc.msg, duration = dur
warn "Got an unexpected error, " &
"while reading server-sent event stream", reason = $exc.msg
return
for event in events:
case event.name
of "data":
let blck = EventBeaconBlockObject.decodeString(event.data).valueOr:
debug "Got invalid block event format", reason = error
return
vc.registerBlock(blck)
of "event":
if event.data != "block":
debug "Got unexpected event name field", event_name = event.name,
event_data = event.data
else:
debug "Got some unexpected event field", event_name = event.name
if len(events) == 0:
break
proc runBlockEventMonitor(service: BlockServiceRef,
node: BeaconNodeServerRef) {.async.} =
let
vc = service.client
roles = {BeaconNodeRole.BlockProposalData}
statuses = {RestBeaconNodeStatus.Synced}
logScope:
node = node
while true:
while node.status notin statuses:
await vc.waitNodes(nil, statuses, roles, false)
let response =
block:
var resp: HttpClientResponseRef
try:
resp = await node.client.subscribeEventStream({EventTopic.Block})
if resp.status == 200:
resp
else:
let body = await resp.getBodyBytes()
await resp.closeWait()
let
plain = RestPlainResponse(status: resp.status,
contentType: resp.contentType, data: body)
reason = plain.getErrorMessage()
debug "Unable to to obtain events stream", code = resp.status,
reason = reason
return
except RestError as exc:
if not(isNil(resp)): await resp.closeWait()
debug "Unable to obtain events stream", reason = $exc.msg
return
except CancelledError as exc:
if not(isNil(resp)): await resp.closeWait()
debug "Block monitoring loop has been interrupted"
raise exc
except CatchableError as exc:
if not(isNil(resp)): await resp.closeWait()
warn "Got an unexpected error while trying to establish event stream",
reason = $exc.msg
return
try:
await service.pollForEvents(node, response)
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Got an unexpected error while receiving block events",
reason = $exc.msg
finally:
await response.closeWait()
proc pollForBlockHeaders(service: BlockServiceRef, node: BeaconNodeServerRef,
slot: Slot, waitTime: Duration,
index: int): Future[bool] {.async.} =
let vc = service.client
logScope:
node = node
slot = slot
wait_time = waitTime
schedule_index = index
trace "Polling for block header"
let bres =
try:
await sleepAsync(waitTime)
await node.client.getBlockHeader(BlockIdent.init(slot))
except RestError as exc:
debug "Unable to obtain block header",
reason = $exc.msg, error = $exc.name
return false
except RestResponseError as exc:
debug "Got an error while trying to obtain block header",
reason = exc.message, status = exc.status
return false
except CancelledError as exc:
raise exc
except CatchableError as exc:
warn "Unexpected error encountered while receiving block header",
reason = $exc.msg, error = $exc.name
return false
if bres.isNone():
trace "Beacon node does not yet have block"
return false
let blockHeader = bres.get()
let eventBlock = EventBeaconBlockObject(
slot: blockHeader.data.header.message.slot,
block_root: blockHeader.data.root,
optimistic: blockHeader.execution_optimistic
)
vc.registerBlock(eventBlock)
return true
proc runBlockPollMonitor(service: BlockServiceRef,
node: BeaconNodeServerRef) {.async.} =
let
vc = service.client
roles = {BeaconNodeRole.BlockProposalData}
statuses = {RestBeaconNodeStatus.Synced}
logScope:
node = node
while true:
let currentSlot =
block:
let res = await vc.checkedWaitForNextSlot(ZeroTimeDiff, false)
if res.isNone(): continue
res.geT()
while node.status notin statuses:
await vc.waitNodes(nil, statuses, roles, false)
let
currentTime = vc.beaconClock.now()
afterSlot = currentTime.slotOrZero()
if currentTime > afterSlot.attestation_deadline():
# Attestation time already, lets wait for next slot.
continue
let
pollTime1 = afterSlot.start_beacon_time() + BlockPollOffset1
pollTime2 = afterSlot.start_beacon_time() + BlockPollOffset2
pollTime3 = afterSlot.start_beacon_time() + BlockPollOffset3
var pendingTasks =
block:
var res: seq[FutureBase]
if currentTime <= pollTime1:
let stime = nanoseconds((pollTime1 - currentTime).nanoseconds)
res.add(FutureBase(
service.pollForBlockHeaders(node, afterSlot, stime, 0)))
if currentTime <= pollTime2:
let stime = nanoseconds((pollTime2 - currentTime).nanoseconds)
res.add(FutureBase(
service.pollForBlockHeaders(node, afterSlot, stime, 1)))
if currentTime <= pollTime3:
let stime = nanoseconds((pollTime3 - currentTime).nanoseconds)
res.add(FutureBase(
service.pollForBlockHeaders(node, afterSlot, stime, 2)))
res
try:
while true:
let completedFuture = await race(pendingTasks)
let blockReceived =
block:
var res = false
for future in pendingTasks:
if not(future.done()): continue
if not(cast[Future[bool]](future).read()): continue
res = true
break
res
if blockReceived:
var pending: seq[Future[void]]
for future in pendingTasks:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
break
pendingTasks.keepItIf(it != completedFuture)
if len(pendingTasks) == 0: break
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingTasks:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
except CatchableError as exc:
warn "An unexpected error occurred while running block monitoring",
reason = $exc.msg, error = $exc.name
proc runBlockMonitor(service: BlockServiceRef) {.async.} =
let
vc = service.client
blockNodes = vc.filterNodes(AllBeaconNodeStatuses,
{BeaconNodeRole.BlockProposalData})
let pendingTasks =
case vc.config.monitoringType
of BlockMonitoringType.Disabled:
debug "Block monitoring disabled"
@[newFuture[void]("block.monitor.disabled")]
of BlockMonitoringType.Poll:
var res: seq[Future[void]]
for node in blockNodes:
res.add(service.runBlockPollMonitor(node))
res
of BlockMonitoringType.Event:
var res: seq[Future[void]]
for node in blockNodes:
res.add(service.runBlockEventMonitor(node))
res
try:
await allFutures(pendingTasks)
except CancelledError as exc:
var pending: seq[Future[void]]
for future in pendingTasks:
if not(future.finished()): pending.add(future.cancelAndWait())
await allFutures(pending)
raise exc
except CatchableError as exc:
warn "An unexpected error occurred while running block monitoring",
reason = $exc.msg, error = $exc.name
return
proc mainLoop(service: BlockServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running
debug "Service started"
var future = newFuture[void]()
let future = service.runBlockMonitor()
try:
# Future is not going to be completed, so the only way to exit, is to
# cancel it.

View File

@ -15,7 +15,9 @@ import
".."/spec/[eth2_merkleization, helpers, signatures, validator],
".."/spec/eth2_apis/[eth2_rest_serialization, rest_beacon_client,
dynamic_fee_recipients],
".."/validators/[keystore_management, validator_pool, slashing_protection],
".."/consensus_object_pools/block_pools_types,
".."/validators/[keystore_management, validator_pool, slashing_protection,
validator_duties],
".."/[conf, beacon_clock, version, nimbus_binary_common]
from std/times import Time, toUnix, fromUnix, getTime
@ -26,7 +28,7 @@ export
byteutils, presto_client, eth2_rest_serialization, rest_beacon_client,
phase0, altair, helpers, signatures, validator, eth2_merkleization,
beacon_clock, keystore_management, slashing_protection, validator_pool,
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime
dynamic_fee_recipients, Time, toUnix, fromUnix, getTime, block_pools_types
const
SYNC_TOLERANCE* = 4'u64
@ -151,6 +153,14 @@ type
DoppelgangerAttempt* {.pure.} = enum
None, Failure, SuccessTrue, SuccessFalse
BlockWaiter* = object
future*: Future[seq[Eth2Digest]]
count*: int
BlockDataItem* = object
blocks: seq[Eth2Digest]
waiters*: seq[BlockWaiter]
ValidatorClient* = object
config*: ValidatorClientConf
metricsServer*: Opt[MetricsHttpServerRef]
@ -186,6 +196,7 @@ type
proposerTasks*: Table[Slot, seq[ProposerTask]]
dynamicFeeRecipientsStore*: ref DynamicFeeRecipientsStore
validatorsRegCache*: Table[ValidatorPubKey, SignedValidatorRegistrationV1]
blocksSeen*: Table[Slot, BlockDataItem]
rng*: ref HmacDrbgContext
ApiStrategyKind* {.pure.} = enum
@ -224,6 +235,18 @@ const
BeaconNodeRole.SyncCommitteeData,
BeaconNodeRole.SyncCommitteePublish,
}
AllBeaconNodeStatuses* = {
RestBeaconNodeStatus.Offline,
RestBeaconNodeStatus.Online,
RestBeaconNodeStatus.Incompatible,
RestBeaconNodeStatus.Compatible,
RestBeaconNodeStatus.NotSynced,
RestBeaconNodeStatus.OptSynced,
RestBeaconNodeStatus.Synced,
RestBeaconNodeStatus.UnexpectedCode,
RestBeaconNodeStatus.UnexpectedResponse,
RestBeaconNodeStatus.InternalError
}
proc `$`*(roles: set[BeaconNodeRole]): string =
if card(roles) > 0:
@ -1084,3 +1107,127 @@ proc checkedWaitForNextSlot*(vc: ValidatorClientRef, curSlot: Opt[Slot],
nextSlot = currentSlot + 1
vc.checkedWaitForSlot(nextSlot, offset, showLogs)
proc checkedWaitForNextSlot*(vc: ValidatorClientRef, offset: TimeDiff,
showLogs: bool): Future[Opt[Slot]] =
let
currentTime = vc.beaconClock.now()
currentSlot = currentTime.slotOrZero()
nextSlot = currentSlot + 1
vc.checkedWaitForSlot(nextSlot, offset, showLogs)
proc expectBlock*(vc: ValidatorClientRef, slot: Slot,
confirmations: int = 1): Future[seq[Eth2Digest]] =
var
retFuture = newFuture[seq[Eth2Digest]]("expectBlock")
waiter = BlockWaiter(future: retFuture, count: confirmations)
proc cancellation(udata: pointer) =
vc.blocksSeen.withValue(slot, adata):
adata[].waiters.keepItIf(it.future != retFuture)
proc scheduleCallbacks(data: var BlockDataItem,
waiter: BlockWaiter) =
data.waiters.add(waiter)
for mitem in data.waiters.mitems():
if mitem.count <= len(data.blocks):
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
vc.blocksSeen.mgetOrPut(slot, BlockDataItem()).scheduleCallbacks(waiter)
if not(retFuture.finished()): retFuture.cancelCallback = cancellation
retFuture
proc registerBlock*(vc: ValidatorClientRef, data: EventBeaconBlockObject) =
let
wallTime = vc.beaconClock.now()
delay = wallTime - data.slot.start_beacon_time()
debug "Block received", slot = data.slot,
block_root = shortLog(data.block_root), optimistic = data.optimistic,
delay = delay
proc scheduleCallbacks(data: var BlockDataItem,
blck: EventBeaconBlockObject) =
data.blocks.add(blck.block_root)
for mitem in data.waiters.mitems():
if mitem.count >= len(data.blocks):
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
vc.blocksSeen.mgetOrPut(data.slot, BlockDataItem()).scheduleCallbacks(data)
proc pruneBlocksSeen*(vc: ValidatorClientRef, epoch: Epoch) =
var blocksSeen: Table[Slot, BlockDataItem]
for slot, item in vc.blocksSeen.pairs():
if (slot.epoch() + HISTORICAL_DUTIES_EPOCHS) >= epoch:
blocksSeen[slot] = item
else:
let blockRoot =
if len(item.blocks) == 0:
"<missing>"
elif len(item.blocks) == 1:
shortLog(item.blocks[0])
else:
"[" & item.blocks.mapIt(shortLog(it)).join(", ") & "]"
debug "Block data has been pruned", slot = slot, blocks = blockRoot
vc.blocksSeen = blocksSeen
proc waitForBlock*(
vc: ValidatorClientRef,
slot: Slot,
timediff: TimeDiff,
confirmations: int = 1
) {.async.} =
## This procedure will wait for a block proposal for a ``slot`` received
## by the beacon node.
let
startTime = Moment.now()
waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
slot = slot
timediff = timediff
wait_time = waitTime
debug "Waiting for block proposal"
if waitTime.nanoseconds <= 0'i64:
# We do not have time to wait for block.
return
let blocks =
try:
let timeout = nanoseconds(waitTime.nanoseconds)
await vc.expectBlock(slot, confirmations).wait(timeout)
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block has not been received in time", duration = dur
return
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Block awaiting was interrupted", duration = dur
raise exc
except CatchableError as exc:
let dur = Moment.now() - startTime
error "Unexpected error occured while waiting for block publication",
err_name = exc.name, err_msg = exc.msg, duration = dur
return
let
dur = Moment.now() - startTime
blockRoot =
if len(blocks) == 0:
"<missing>"
elif len(blocks) == 1:
shortLog(blocks[0])
else:
"[" & blocks.mapIt(shortLog(it)).join(", ") & "]"
debug "Block proposal awaited", duration = dur,
block_root = blockRoot
try:
await waitAfterBlockCutoff(vc.beaconClock, slot)
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block cutoff was interrupted", duration = dur
raise exc

View File

@ -11,7 +11,7 @@ import
../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types,
../validators/activity_metrics,
"."/[common, api, block_service]
"."/[common, api]
const
ServiceName = "sync_committee_service"
@ -340,7 +340,7 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
async.} =
let vc = service.client
await vc.waitForBlockPublished(slot, syncCommitteeMessageSlotOffset)
await vc.waitForBlock(slot, syncCommitteeMessageSlotOffset)
block:
let delay = vc.getDelay(slot.sync_committee_message_deadline())
@ -457,7 +457,7 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
try:
let
# We use zero offset here, because we do waiting in
# waitForBlockPublished(syncCommitteeMessageSlotOffset).
# waitForBlock(syncCommitteeMessageSlotOffset).
slot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff,
false)
if slot.isNone():

View File

@ -1552,6 +1552,42 @@ proc updateValidators(
index: index, validator: validators[int index]
)))
proc waitAfterBlockCutoff*(clock: BeaconClock, slot: Slot,
head: Opt[BlockRef] = Opt.none(BlockRef)) {.async.} =
# The expected block arrived (or expectBlock was called again which
# shouldn't happen as this is the only place we use it) - in our async
# loop however, we might have been doing other processing that caused delays
# here so we'll cap the waiting to the time when we would have sent out
# attestations had the block not arrived.
# An opposite case is that we received (or produced) a block that has
# not yet reached our neighbours. To protect against our attestations
# being dropped (because the others have not yet seen the block), we'll
# impose a minimum delay of 2000ms. The delay is enforced only when we're
# not hitting the "normal" cutoff time for sending out attestations.
# An earlier delay of 250ms has proven to be not enough, increasing the
# risk of losing attestations, and with growing block sizes, 1000ms
# started to be risky as well.
# Regardless, because we "just" received the block, we'll impose the
# delay.
# Take into consideration chains with a different slot time
const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2)
let
afterBlockTime = clock.now() + afterBlockDelay
afterBlockCutoff = clock.fromNow(
min(afterBlockTime, slot.attestation_deadline() + afterBlockDelay))
if afterBlockCutoff.inFuture:
if head.isSome():
debug "Got block, waiting to send attestations",
head = shortLog(head.get()), slot = slot,
afterBlockCutoff = shortLog(afterBlockCutoff.offset)
else:
debug "Got block, waiting to send attestations",
slot = slot, afterBlockCutoff = shortLog(afterBlockCutoff.offset)
await sleepAsync(afterBlockCutoff.offset)
proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
## Perform validator duties - create blocks, vote and aggregate existing votes
if node.attachedValidators[].count == 0:
@ -1624,35 +1660,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
# Wait either for the block or the attestation cutoff time to arrive
if await node.consensusManager[].expectBlock(slot)
.withTimeout(attestationCutoff.offset):
# The expected block arrived (or expectBlock was called again which
# shouldn't happen as this is the only place we use it) - in our async
# loop however, we might have been doing other processing that caused delays
# here so we'll cap the waiting to the time when we would have sent out
# attestations had the block not arrived.
# An opposite case is that we received (or produced) a block that has
# not yet reached our neighbours. To protect against our attestations
# being dropped (because the others have not yet seen the block), we'll
# impose a minimum delay of 2000ms. The delay is enforced only when we're
# not hitting the "normal" cutoff time for sending out attestations.
# An earlier delay of 250ms has proven to be not enough, increasing the
# risk of losing attestations, and with growing block sizes, 1000ms
# started to be risky as well.
# Regardless, because we "just" received the block, we'll impose the
# delay.
# Take into consideration chains with a different slot time
const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2)
let
afterBlockTime = node.beaconClock.now() + afterBlockDelay
afterBlockCutoff = node.beaconClock.fromNow(
min(afterBlockTime, slot.attestation_deadline() + afterBlockDelay))
if afterBlockCutoff.inFuture:
debug "Got block, waiting to send attestations",
head = shortLog(head),
afterBlockCutoff = shortLog(afterBlockCutoff.offset)
await sleepAsync(afterBlockCutoff.offset)
await waitAfterBlockCutoff(node.beaconClock, slot, Opt.some(head))
# Time passed - we might need to select a new head in that case
node.consensusManager[].updateHead(slot)