Change waitForBlockPublished -> waitForBlock(expectBlock).

This commit is contained in:
cheatfate 2023-04-12 04:07:01 +03:00
parent 3c814c60b1
commit 9266fe20af
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
5 changed files with 96 additions and 90 deletions

View File

@ -10,7 +10,7 @@ import
./rpc/rest_key_management_api,
./validator_client/[
common, fallback_service, duties_service, fork_service, block_service,
doppelganger_service, attestation_service, sync_committee_service
doppelganger_service, attestation_service, sync_committee_service,
monitor_service]
const

View File

@ -9,7 +9,7 @@ import
std/sets,
chronicles,
../validators/activity_metrics,
"."/[common, api, block_service]
"."/[common, api]
const
ServiceName = "attestation_service"
@ -332,7 +332,7 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
duties: seq[DutyAndProof]) {.async.} =
let vc = service.client
# Waiting for blocks to be published before attesting.
await vc.waitForBlockPublished(slot, attestationSlotOffset)
await vc.waitForBlock(slot, attestationSlotOffset)
block:
let delay = vc.getDelay(slot.attestation_deadline())
@ -431,7 +431,7 @@ proc mainLoop(service: AttestationServiceRef) {.async.} =
try:
let
# We use zero offset here, because we do waiting in
# waitForBlockPublished(attestationSlotOffset).
# waitForBlock(attestationSlotOffset).
slot = await vc.checkedWaitForNextSlot(currentSlot,
ZeroTimeDiff, false)
if slot.isNone():

View File

@ -428,85 +428,6 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
proc waitForBlockPublished*(vc: ValidatorClientRef,
slot: Slot, timediff: TimeDiff) {.async.} =
## This procedure will wait for all the block proposal tasks to be finished at
## slot ``slot``.
let
startTime = Moment.now()
pendingTasks =
block:
var res: seq[Future[void]]
let epochDuties = vc.proposers.getOrDefault(slot.epoch())
for task in epochDuties.duties:
if task.duty.slot == slot:
if not(task.future.finished()):
res.add(task.future)
res
waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
start_time = startTime
pending_tasks = len(pendingTasks)
slot = slot
timediff = timediff
# TODO (cheatfate): This algorithm should be tuned, when we will have ability
# to monitor block proposals which are not created by validators bundled with
# VC.
logScope: wait_time = waitTime
if waitTime.nanoseconds > 0'i64:
if len(pendingTasks) > 0:
# Block proposal pending
try:
await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds))
trace "Block proposal awaited"
# The expected block arrived - in our async loop however, we might
# have been doing other processing that caused delays here so we'll
# cap the waiting to the time when we would have sent out attestations
# had the block not arrived. An opposite case is that we received
# (or produced) a block that has not yet reached our neighbours. To
# protect against our attestations being dropped (because the others
# have not yet seen the block), we'll impose a minimum delay of
# 2000ms. The delay is enforced only when we're not hitting the
# "normal" cutoff time for sending out attestations. An earlier delay
# of 250ms has proven to be not enough, increasing the risk of losing
# attestations, and with growing block sizes, 1000ms started to be
# risky as well. Regardless, because we "just" received the block,
# we'll impose the delay.
# Take into consideration chains with a different slot time
const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2)
let
afterBlockTime = vc.beaconClock.now() + afterBlockDelay
afterBlockCutoff = vc.beaconClock.fromNow(
min(afterBlockTime,
slot.attestation_deadline() + afterBlockDelay))
if afterBlockCutoff.inFuture:
debug "Got block, waiting to send attestations",
after_block_cutoff = shortLog(afterBlockCutoff.offset)
await sleepAsync(afterBlockCutoff.offset)
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not published in time", duration = dur
else:
# No pending block proposals.
try:
await sleepAsync(nanoseconds(waitTime.nanoseconds))
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except CatchableError as exc:
let dur = Moment.now() - startTime
error "Unexpected error occured while waiting for block publication",
err_name = exc.name, err_msg = exc.msg, duration = dur
return
proc mainLoop(service: BlockServiceRef) {.async.} =
let vc = service.client
service.state = ServiceState.Running

View File

@ -1029,7 +1029,7 @@ proc expectBlock*(vc: ValidatorClientRef, slot: Slot,
waiter: BlockWaiter) =
data.waiters.add(waiter)
for mitem in data.waiters.mitems():
if mitem.count >= len(data.blocks):
if mitem.count <= len(data.blocks):
if not(mitem.future.finished()): mitem.future.complete(data.blocks)
vc.blocksSeen.mgetOrPut(slot, BlockDataItem()).scheduleCallbacks(waiter)
@ -1041,9 +1041,9 @@ proc registerBlock*(vc: ValidatorClientRef, data: EventBeaconBlockObject) =
wallTime = vc.beaconClock.now()
delay = wallTime - data.slot.start_beacon_time()
notice "Block received", slot = data.slot,
block_root = shortLog(data.block_root), optimistic = data.optimistic,
delay = delay
debug "Block received", slot = data.slot,
block_root = shortLog(data.block_root), optimistic = data.optimistic,
delay = delay
proc scheduleCallbacks(data: var BlockDataItem,
blck: EventBeaconBlockObject) =
@ -1068,3 +1068,88 @@ proc pruneBlocksSeen*(vc: ValidatorClientRef, epoch: Epoch) =
"[" & item.blocks.mapIt(shortLog(it)).join(", ") & "]"
debug "Block data has been pruned", slot = slot, blocks = blockRoot
vc.blocksSeen = blocksSeen
proc waitForBlock*(
vc: ValidatorClientRef,
slot: Slot,
timediff: TimeDiff,
confirmations: int = 1
) {.async.} =
## This procedure will wait for a block proposal for a ``slot`` received
## by the beacon node.
let
startTime = Moment.now()
waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
slot = slot
timediff = timediff
wait_time = waitTime
debug "Waiting for block proposal"
if waitTime.nanoseconds <= 0'i64:
# We do not have time to wait for block.
return
let blocks =
try:
let timeout = nanoseconds(waitTime.nanoseconds)
await vc.expectBlock(slot, confirmations).wait(timeout)
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block has not been received in time", duration = dur
return
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Block awaiting was interrupted", duration = dur
raise exc
except CatchableError as exc:
let dur = Moment.now() - startTime
error "Unexpected error occured while waiting for block publication",
err_name = exc.name, err_msg = exc.msg, duration = dur
return
let
dur = Moment.now() - startTime
blockRoot =
if len(blocks) == 0:
"<missing>"
elif len(blocks) == 1:
shortLog(blocks[0])
else:
"[" & blocks.mapIt(shortLog(it)).join(", ") & "]"
debug "Block proposal(s) awaited", duration = dur,
block_root = blockRoot
# The expected block arrived - in our async loop however, we might
# have been doing other processing that caused delays here so we'll
# cap the waiting to the time when we would have sent out attestations
# had the block not arrived. An opposite case is that we received
# (or produced) a block that has not yet reached our neighbours. To
# protect against our attestations being dropped (because the others
# have not yet seen the block), we'll impose a minimum delay of
# 2000ms. The delay is enforced only when we're not hitting the
# "normal" cutoff time for sending out attestations. An earlier delay
# of 250ms has proven to be not enough, increasing the risk of losing
# attestations, and with growing block sizes, 1000ms started to be
# risky as well. Regardless, because we "just" received the block,
# we'll impose the delay.
# Take into consideration chains with a different slot time
const afterBlockDelay = nanos(attestationSlotOffset.nanoseconds div 2)
let
afterBlockTime = vc.beaconClock.now() + afterBlockDelay
afterBlockCutoff = vc.beaconClock.fromNow(
min(afterBlockTime, slot.attestation_deadline() + afterBlockDelay))
if afterBlockCutoff.inFuture:
debug "Got block, waiting for block cutoff time",
after_block_cutoff = shortLog(afterBlockCutoff.offset)
try:
await sleepAsync(afterBlockCutoff.offset)
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block cutoff was interrupted", duration = dur
raise exc

View File

@ -11,7 +11,7 @@ import
../spec/datatypes/[phase0, altair, bellatrix],
../spec/eth2_apis/rest_types,
../validators/activity_metrics,
"."/[common, api, block_service]
"."/[common, api]
const
ServiceName = "sync_committee_service"
@ -340,7 +340,7 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
async.} =
let vc = service.client
await vc.waitForBlockPublished(slot, syncCommitteeMessageSlotOffset)
await vc.waitForBlock(slot, syncCommitteeMessageSlotOffset)
block:
let delay = vc.getDelay(slot.sync_committee_message_deadline())
@ -458,7 +458,7 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
try:
let
# We use zero offset here, because we do waiting in
# waitForBlockPublished(syncCommitteeMessageSlotOffset).
# waitForBlock(syncCommitteeMessageSlotOffset).
slot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff,
false)
if slot.isNone():