VC: Refactor some timing code around sync committee processing (#6073)

* Add some duration metering.
Refactor some log statements.
Rework sync contribution deadline waiting.
Add some cancellation reporting handlers.

* Make all validator's shortLog to become validatorLog.
Optimize some logs with logScope.

* Add `raises`.

* More log statements polishing.
This commit is contained in:
Eugene Kabanov 2024-03-22 04:37:44 +02:00 committed by GitHub
parent 9d5643240b
commit a6e9e0774c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 147 additions and 116 deletions

View File

@ -5,6 +5,8 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/sets,
chronicles,
@ -167,7 +169,7 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
if (duty.data.slot != data.slot) or
(uint64(duty.data.committee_index) != data.index):
warn "Inconsistent validator duties during attestation signing",
validator = shortLog(duty.data.pubkey),
pubkey = shortLog(duty.data.pubkey),
duty_slot = duty.data.slot,
duty_index = duty.data.committee_index,
attestation_slot = data.slot, attestation_index = data.index

View File

@ -59,7 +59,7 @@ proc produceBlock(
logScope:
slot = slot
wall_slot = currentSlot
validator = shortLog(validator)
validator = validatorLog(validator)
let
produceBlockResponse =
try:
@ -120,7 +120,7 @@ proc produceBlindedBlock(
logScope:
slot = slot
wall_slot = currentSlot
validator = shortLog(validator)
validator = validatorLog(validator)
let
beaconBlock =
try:
@ -178,17 +178,17 @@ proc prepareRandao(vc: ValidatorClientRef, slot: Slot,
timeElapsed = Moment.now() - start
if rsig.isErr():
debug "Unable to prepare RANDAO signature", epoch = epoch,
validator = shortLog(validator), elapsed_time = timeElapsed,
validator = validatorLog(validator), elapsed_time = timeElapsed,
current_slot = currentSlot, destination_slot = destSlot,
delay = vc.getDelay(deadline)
else:
debug "RANDAO signature has been prepared", epoch = epoch,
validator = shortLog(validator), elapsed_time = timeElapsed,
validator = validatorLog(validator), elapsed_time = timeElapsed,
current_slot = currentSlot, destination_slot = destSlot,
delay = vc.getDelay(deadline)
else:
debug "RANDAO signature preparation timed out", epoch = epoch,
validator = shortLog(validator),
validator = validatorLog(validator),
current_slot = currentSlot, destination_slot = destSlot,
delay = vc.getDelay(deadline)
@ -225,7 +225,7 @@ proc publishBlockV3(vc: ValidatorClientRef, currentSlot, slot: Slot,
vindex = validator.index.get()
logScope:
validator = shortLog(validator)
validator = validatorLog(validator)
validator_index = vindex
slot = slot
wall_slot = currentSlot
@ -415,7 +415,7 @@ proc publishBlockV2(vc: ValidatorClientRef, currentSlot, slot: Slot,
vindex = validator.index.get()
logScope:
validator = shortLog(validator)
validator = validatorLog(validator)
validator_index = vindex
slot = slot
wall_slot = currentSlot
@ -634,7 +634,7 @@ proc publishBlock(vc: ValidatorClientRef, currentSlot, slot: Slot,
vindex = validator.index.get()
logScope:
validator = shortLog(validator)
validator = validatorLog(validator)
validator_index = vindex
slot = slot
wall_slot = currentSlot
@ -690,11 +690,11 @@ proc proposeBlock(vc: ValidatorClientRef, slot: Slot,
await vc.publishBlock(currentSlot, slot, validator)
except CancelledError as exc:
debug "Block proposing process was interrupted",
slot = slot, validator = shortLog(proposerKey)
slot = slot, validator = validatorLog(validator)
raise exc
except CatchableError:
error "Unexpected error encountered while proposing block",
slot = slot, validator = shortLog(validator)
slot = slot, validator = validatorLog(validator)
proc contains(data: openArray[RestProposerDuty], task: ProposerTask): bool =
for item in data:
@ -715,12 +715,12 @@ proc checkDuty(duty: RestProposerDuty, epoch: Epoch, slot: Slot): bool =
true
else:
warn "Block proposal duty is in the far future, ignoring",
duty_slot = duty.slot, validator = shortLog(duty.pubkey),
duty_slot = duty.slot, pubkey = shortLog(duty.pubkey),
wall_slot = slot, last_slot_in_epoch = (lastSlot - 1'u64)
false
else:
warn "Block proposal duty is in the past, ignoring", duty_slot = duty.slot,
validator = shortLog(duty.pubkey), wall_slot = slot
pubkey = shortLog(duty.pubkey), wall_slot = slot
false
proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
@ -747,20 +747,20 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
# Task is no more relevant, so cancel it.
debug "Cancelling running proposal duty tasks",
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
pubkey = shortLog(task.duty.pubkey)
task.proposeFut.cancelSoon()
task.randaoFut.cancelSoon()
else:
# If task is already running for proper slot, we keep it alive.
debug "Keep running previous proposal duty tasks",
slot = task.duty.slot,
validator = shortLog(task.duty.pubkey)
pubkey = shortLog(task.duty.pubkey)
res.add(task)
for duty in duties:
if duty notin res:
debug "New proposal duty received", slot = duty.slot,
validator = shortLog(duty.pubkey)
info "Received new proposer duty", slot = duty.slot,
pubkey = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:
@ -781,8 +781,8 @@ proc addOrReplaceProposers*(vc: ValidatorClientRef, epoch: Epoch,
var hashset = initHashSet[Slot]()
var res: seq[ProposerTask]
for duty in duties:
debug "New proposal duty received", slot = duty.slot,
validator = shortLog(duty.pubkey)
info "Received new proposer duty", slot = duty.slot,
pubkey = shortLog(duty.pubkey)
if checkDuty(duty, epoch, currentSlot):
let task = vc.spawnProposalTask(duty)
if duty.slot in hashset:

View File

@ -1003,7 +1003,7 @@ proc getValidatorRegistration(
): Result[PendingValidatorRegistration, RegistrationKind] =
if validator.index.isNone():
debug "Validator registration missing validator index",
validator = shortLog(validator)
validator = validatorLog(validator)
return err(RegistrationKind.MissingIndex)
let
@ -1039,13 +1039,13 @@ proc getValidatorRegistration(
if not(sigfut.completed()):
let exc = sigfut.error()
debug "Got unexpected exception while signing validator registration",
validator = shortLog(validator), error_name = $exc.name,
error_msg = $exc.msg
validator = validatorLog(validator), error = exc.name,
reason = exc.msg
return err(RegistrationKind.ErrorSignature)
let sigres = sigfut.value()
if sigres.isErr():
debug "Failed to get signature for validator registration",
validator = shortLog(validator), error = sigres.error()
validator = validatorLog(validator), reason = sigres.error()
return err(RegistrationKind.NoSignature)
registration.signature = sigres.get()
# Updating cache table with new signed registration data

View File

@ -5,6 +5,8 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import chronicles
import "."/[common, api]
@ -35,7 +37,7 @@ proc processActivities(service: DoppelgangerServiceRef, epoch: Epoch,
if item.is_live and validator.triggersDoppelganger(epoch):
warn "Doppelganger detection triggered",
validator = shortLog(validator), epoch
validator = validatorLog(validator), epoch
vc.doppelExit.fire()
return

View File

@ -271,9 +271,9 @@ proc pollForSyncCommitteeDuties*(
res
for item in addOrReplaceItems:
vc.syncCommitteeDuties.mgetOrPut(item.duty.pubkey,
default(SyncPeriodDuties)).duties[item.period] =
item.duty
vc.syncCommitteeDuties.mgetOrPut(
item.duty.pubkey, default(SyncPeriodDuties)).duties[item.period] =
item.duty
len(addOrReplaceItems)
proc pruneAttesterDuties(service: DutiesServiceRef, epoch: Epoch) =
@ -285,8 +285,8 @@ proc pruneAttesterDuties(service: DutiesServiceRef, epoch: Epoch) =
if (epochKey + HISTORICAL_DUTIES_EPOCHS) >= epoch:
v.duties[epochKey] = epochDuty
else:
debug "Attester duties for the epoch has been pruned", validator = key,
epoch = epochKey, loop = AttesterLoop
debug "Attester duties for the epoch has been pruned",
pubkey = shortLog(key), epoch = epochKey, loop = AttesterLoop
attesters[key] = v
vc.attesters = attesters

View File

@ -217,7 +217,7 @@ proc fillAttestationSelectionProofs*(
notice "Found missing validator while processing " &
"beacon committee selections", validator_index = vindex,
slot = selection.slot,
validator = shortLog(key.get()),
pubkey = shortLog(key.get()),
selection_proof = shortLog(selection.selection_proof)
continue
@ -483,7 +483,7 @@ proc fillSyncCommitteeSelectionProofs*(
notice "Found missing validator while processing " &
"sync committee selections", validator_index = vindex,
slot = slot,
validator = shortLog(key.get()),
pubkey = shortLog(key.get()),
selection_proof = shortLog(selection.selection_proof)
continue
request =
@ -494,7 +494,7 @@ proc fillSyncCommitteeSelectionProofs*(
warn "Found sync committee selection proof which was not " &
"requested",
slot = slot, subcommittee_index = subcommittee_index,
validator = shortLog(validator),
validator = validatorLog(validator),
selection_proof = shortLog(selection.selection_proof)
continue
res.get()

View File

@ -5,6 +5,8 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/sets,
metrics, chronicles,
@ -31,25 +33,33 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
async.} =
let
vc = service.client
startTime = Moment.now()
fork = vc.forkAtEpoch(slot.epoch)
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
vindex = duty.validator_index
validator = vc.getValidatorForDuties(
duty.pubkey, slot, slashingSafe = true).valueOr: return false
logScope:
validator = validatorLog(validator)
block_root = shortLog(beaconBlockRoot)
slot = slot
let
message =
block:
let res = await getSyncCommitteeMessage(validator, fork,
genesisValidatorsRoot,
slot, beaconBlockRoot)
if res.isErr():
warn "Unable to sign committee message using remote signer",
validator = shortLog(validator), slot = slot,
block_root = shortLog(beaconBlockRoot)
warn "Unable to sign committee message using remote signer"
return
res.get()
debug "Sending sync committee message", message = shortLog(message),
validator = shortLog(validator), validator_index = vindex,
logScope:
message = shortLog(message)
debug "Sending sync committee message",
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
let res =
@ -57,9 +67,6 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
await vc.submitPoolSyncCommitteeSignature(message, ApiStrategyKind.First)
except ValidatorApiError as exc:
warn "Unable to publish sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
reason = exc.getFailureReason()
return false
except CancelledError:
@ -67,35 +74,31 @@ proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
return false
except CatchableError as exc:
error "Unexpected error occurred while publishing sync committee message",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
err_name = exc.name, err_msg = exc.msg
error = exc.name, reason = exc.msg
return false
let delay = vc.getDelay(message.slot.sync_committee_message_deadline())
let
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
dur = Moment.now() - startTime
if res:
beacon_sync_committee_messages_sent.inc()
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
notice "Sync committee message published",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex,
delay = delay
validator_index = vindex, delay = delay, duration = dur
else:
warn "Sync committee message was not accepted by beacon node",
message = shortLog(message),
validator = shortLog(validator),
validator_index = vindex, delay = delay
return res
validator_index = vindex, delay = delay, duration = dur
res
proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncCommitteeDuty])
{.async.} =
let vc = service.client
let
vc = service.client
startTime = Moment.now()
let pendingSyncCommitteeMessages =
block:
@ -128,38 +131,54 @@ proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(slot.attestation_deadline())
let
delay = vc.getDelay(slot.attestation_deadline())
dur = Moment.now() - startTime
debug "Sync committee message statistics",
total = len(pendingSyncCommitteeMessages),
succeed = statistics[0], failed_to_deliver = statistics[1],
not_accepted = statistics[2], delay = delay, slot = slot,
duties_count = len(duties)
not_accepted = statistics[2], delay = delay, duration = dur,
slot = slot, duties_count = len(duties)
proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
proof: ContributionAndProof,
validator: AttachedValidator): Future[bool] {.
async.} =
## Signs ConributionAndProof object and sends it to BN.
let
vc = service.client
startTime = Moment.now()
slot = proof.contribution.slot
validatorIdx = validator.index.get()
genesisRoot = vc.beaconGenesis.genesis_validators_root
fork = vc.forkAtEpoch(slot.epoch)
logScope:
validator = validatorLog(validator)
contribution = shortLog(proof.contribution)
let signature =
block:
let res = await validator.getContributionAndProofSignature(
fork, genesisRoot, proof)
let res =
try:
await validator.getContributionAndProofSignature(
fork, genesisRoot, proof)
except CancelledError:
debug "Sync contribution signing process was interrupted"
return false
except CatchableError as exc:
error "Unexpected error occurred while signing sync contribution",
error = exc.name, reason = exc.msg
return false
if res.isErr():
warn "Unable to sign sync committee contribution using remote signer",
validator = shortLog(validator),
contribution = shortLog(proof.contribution),
error_msg = res.error()
reason = res.error()
return false
res.get()
debug "Sending sync contribution",
contribution = shortLog(proof.contribution),
validator = shortLog(validator), validator_index = validatorIdx,
delay = vc.getDelay(slot.sync_contribution_deadline())
let restSignedProof = RestSignedContributionAndProof.init(
@ -171,39 +190,35 @@ proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
ApiStrategyKind.First)
except ValidatorApiError as exc:
warn "Unable to publish sync contribution",
contribution = shortLog(proof.contribution),
validator = shortLog(validator),
validator_index = validatorIdx,
err_msg = exc.msg,
reason = exc.getFailureReason()
false
except CancelledError:
debug "Publish sync contribution request was interrupted"
debug "Publication process of sync contribution was interrupted"
return false
except CatchableError as err:
error "Unexpected error occurred while publishing sync contribution",
contribution = shortLog(proof.contribution),
validator = shortLog(validator),
err_name = err.name, err_msg = err.msg
error = err.name, reason = err.msg
false
let dur = Moment.now() - startTime
if res:
beacon_sync_committee_contributions_sent.inc()
notice "Sync contribution published",
validator = shortLog(validator),
validator_index = validatorIdx
notice "Sync contribution published", duration = dur
else:
warn "Sync contribution was not accepted by beacon node",
contribution = shortLog(proof.contribution),
validator = shortLog(validator),
validator_index = validatorIdx
return res
warn "Sync contribution was not accepted by beacon node", duration = dur
res
proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
slot: Slot,
beaconBlockRoot: Eth2Digest,
duties: seq[SyncCommitteeDuty]) {.async.} =
let vc = service.client
let
vc = service.client
startTime = Moment.now()
logScope:
slot = slot
block_root = shortLog(beaconBlockRoot)
var (contributions, pendingFutures, contributionsMap) =
block:
@ -263,12 +278,9 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
if index notin completed: completed.add(index)
let aggContribution =
try:
let res = future.read()
Opt.some(res)
Opt.some(future.read())
except ValidatorApiError as exc:
warn "Unable to get sync message contribution data",
slot = slot,
beacon_block_root = shortLog(beaconBlockRoot),
reason = exc.getFailureReason()
Opt.none(SyncCommitteeContribution)
except CancelledError as exc:
@ -277,9 +289,8 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
raise exc
except CatchableError as exc:
error "Unexpected error occurred while getting sync " &
"message contribution", slot = slot,
beacon_block_root = shortLog(beaconBlockRoot),
err_name = exc.name, err_msg = exc.msg
"message contribution",
error = exc.name, reason = exc.msg
Opt.none(SyncCommitteeContribution)
if aggContribution.isSome():
@ -319,17 +330,20 @@ proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
inc(errored)
(succeed, errored, failed)
let delay = vc.getDelay(slot.aggregate_deadline())
let
delay = vc.getDelay(slot.aggregate_deadline())
dur = Moment.now() - startTime
debug "Sync message contribution statistics",
total = len(contributions),
succeed = statistics[0],
failed_to_create = len(pendingAggregates) - len(contributions),
failed_to_deliver = statistics[1],
not_accepted = statistics[2],
delay = delay, slot = slot
delay = delay, duration = dur
else:
debug "No contribution and proofs scheduled for the slot", slot = slot
debug "No contribution and proofs scheduled for the slot"
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
slot: Slot,
@ -339,9 +353,12 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
await vc.waitForBlock(slot, syncCommitteeMessageSlotOffset)
logScope:
slot = slot
block:
let delay = vc.getDelay(slot.sync_committee_message_deadline())
debug "Producing sync committee messages", delay = delay, slot = slot,
debug "Producing sync committee messages", delay = delay,
duties_count = len(duties)
let beaconBlockRoot =
@ -357,7 +374,7 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
res.data.root
else:
if res.execution_optimistic.get():
notice "Execution client not in sync", slot = slot
notice "Execution client not in sync"
return
res.data.root
except ValidatorApiError as exc:
@ -369,38 +386,45 @@ proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
return
except CatchableError as exc:
error "Unexpected error while requesting sync message block root",
err_name = exc.name, err_msg = exc.msg, slot = slot
error = exc.name, reason = exc.msg
return
try:
await service.produceAndPublishSyncCommitteeMessages(slot,
beaconBlockRoot,
duties)
await service.produceAndPublishSyncCommitteeMessages(
slot, beaconBlockRoot, duties)
except ValidatorApiError as exc:
warn "Unable to proceed sync committee messages", slot = slot,
warn "Unable to proceed sync committee messages",
duties_count = len(duties), reason = exc.getFailureReason()
return
except CancelledError:
debug "Sync committee producing process was interrupted"
debug "Sync committee messages production was interrupted"
return
except CatchableError as exc:
error "Unexpected error while producing sync committee messages",
slot = slot,
duties_count = len(duties),
err_name = exc.name, err_msg = exc.msg
duties_count = len(duties), error = exc.name, reason = exc.msg
return
let contributionTime =
# chronos.Duration subtraction cannot return a negative value; in such
# case it will return `ZeroDuration`.
vc.beaconClock.durationToNextSlot() - OneThirdDuration
if contributionTime != ZeroDuration:
await sleepAsync(contributionTime)
let currentTime = vc.beaconClock.now()
if slot.sync_contribution_deadline() > currentTime:
let waitDur =
nanoseconds((slot.sync_contribution_deadline() - currentTime).nanoseconds)
# Sleeping until `sync_contribution_deadline`.
debug "Waiting for sync contribution deadline", wait_time = waitDur
await sleepAsync(waitDur)
block:
let delay = vc.getDelay(slot.sync_contribution_deadline())
debug "Producing contribution and proofs", delay = delay
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
try:
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
except CancelledError:
debug "Sync committee contributions production was interrupted"
return
except CatchableError as exc:
error "Unexpected error while producing sync committee contributions",
duties_count = len(duties), error = exc.name, reason = exc.msg
return
proc processSyncCommitteeTasks(service: SyncCommitteeServiceRef,
slot: Slot) {.async.} =
@ -409,12 +433,15 @@ proc processSyncCommitteeTasks(service: SyncCommitteeServiceRef,
duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
timeout = vc.beaconClock.durationToNextSlot()
logScope:
slot = slot
try:
await service.publishSyncMessagesAndContributions(slot,
duties).wait(timeout)
except AsyncTimeoutError:
warn "Unable to publish sync committee messages and contributions in time",
slot = slot, timeout = timeout
timeout = timeout
except CancelledError as exc:
debug "Sync committee publish task has been interrupted"
raise exc
@ -439,8 +466,8 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
debug "Service interrupted"
return
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
warn "Service crashed with unexpected error", error = exc.name,
reason = exc.msg
return
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
@ -468,8 +495,8 @@ proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
debug "Service interrupted"
true
except CatchableError as exc:
warn "Service crashed with unexpected error", err_name = exc.name,
err_msg = exc.msg
warn "Service crashed with unexpected error", error = exc.name,
reason = exc.msg
true
if breakLoop:
@ -481,7 +508,7 @@ proc init*(t: typedesc[SyncCommitteeServiceRef],
let res = SyncCommitteeServiceRef(name: ServiceName, client: vc,
state: ServiceState.Initialized)
debug "Initializing service"
return res
res
proc start*(service: SyncCommitteeServiceRef) =
service.lifeFut = mainLoop(service)