Address inaccurate time calculation for waitForBlockPublished(). (#4368)

Address #4353.
This commit is contained in:
Eugene Kabanov 2022-11-29 12:52:21 +02:00 committed by GitHub
parent df54470b13
commit 6c07c6e625
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 40 deletions

View File

@ -346,18 +346,7 @@ proc publishAttestationsAndAggregates(service: AttestationServiceRef,
duties: seq[DutyAndProof]) {.async.} =
let vc = service.client
# Waiting for blocks to be published before attesting.
let startTime = Moment.now()
try:
let timeout = attestationSlotOffset # 4.seconds in mainnet
await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds))
let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur
except CancelledError as exc:
debug "Block proposal waiting was interrupted"
raise exc
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not produced in time", slot = slot, duration = dur
await vc.waitForBlockPublished(slot, attestationSlotOffset)
block:
let delay = vc.getDelay(slot.attestation_deadline())

View File

@ -426,17 +426,39 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
res
vc.proposers[epoch] = ProposedData.init(epoch, dependentRoot, tasks)
proc waitForBlockPublished*(vc: ValidatorClientRef, slot: Slot) {.async.} =
proc waitForBlockPublished*(vc: ValidatorClientRef,
slot: Slot, timediff: TimeDiff) {.async.} =
## This procedure will wait for all the block proposal tasks to be finished at
## slot ``slot``
let pendingTasks =
block:
var res: seq[Future[void]]
let epochDuties = vc.proposers.getOrDefault(slot.epoch())
for task in epochDuties.duties:
if task.duty.slot == slot:
if not(task.future.finished()):
res.add(task.future)
res
## slot ``slot``.
let
startTime = Moment.now()
pendingTasks =
block:
var res: seq[Future[void]]
let epochDuties = vc.proposers.getOrDefault(slot.epoch())
for task in epochDuties.duties:
if task.duty.slot == slot:
if not(task.future.finished()):
res.add(task.future)
res
logScope:
start_time = startTime
pending_tasks = len(pendingTasks)
slot = slot
timediff = timediff
if len(pendingTasks) > 0:
await allFutures(pendingTasks)
let waitTime = (start_beacon_time(slot) + timediff) - vc.beaconClock.now()
logScope:
wait_time = waitTime
if waitTime.nanoseconds > 0'i64:
try:
await allFutures(pendingTasks).wait(nanoseconds(waitTime.nanoseconds))
trace "Block proposal awaited"
except CancelledError as exc:
let dur = Moment.now() - startTime
debug "Waiting for block publication interrupted", duration = dur
raise exc
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not published in time", duration = dur

View File

@ -332,23 +332,11 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
slot: Slot,
duties: seq[SyncCommitteeDuty])
{.async.} =
let
vc = service.client
startTime = Moment.now()
duties: seq[SyncCommitteeDuty]) {.
async.} =
let vc = service.client
try:
let timeout = syncCommitteeMessageSlotOffset
await vc.waitForBlockPublished(slot).wait(nanoseconds(timeout.nanoseconds))
let dur = Moment.now() - startTime
debug "Block proposal awaited", slot = slot, duration = dur
except CancelledError:
debug "Block proposal waiting was interrupted"
return
except AsyncTimeoutError:
let dur = Moment.now() - startTime
debug "Block was not produced in time", slot = slot, duration = dur
await vc.waitForBlockPublished(slot, syncCommitteeMessageSlotOffset)
block:
let delay = vc.getDelay(slot.sync_committee_message_deadline())