514 lines
18 KiB
Nim
514 lines
18 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2022-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/sets,
|
|
metrics, chronicles,
|
|
../spec/datatypes/[phase0, altair, bellatrix],
|
|
../spec/eth2_apis/rest_types,
|
|
../validators/activity_metrics,
|
|
"."/[common, api, selection_proofs]
|
|
|
|
const
|
|
ServiceName = "sync_committee_service"
|
|
|
|
logScope: service = ServiceName
|
|
|
|
type
|
|
ContributionItem* = object
|
|
aggregator_index: uint64
|
|
selection_proof: ValidatorSig
|
|
validator: AttachedValidator
|
|
subcommitteeIdx: SyncSubcommitteeIndex
|
|
|
|
proc serveSyncCommitteeMessage*(service: SyncCommitteeServiceRef,
|
|
slot: Slot, beaconBlockRoot: Eth2Digest,
|
|
duty: SyncCommitteeDuty): Future[bool] {.
|
|
async.} =
|
|
let
|
|
vc = service.client
|
|
startTime = Moment.now()
|
|
fork = vc.forkAtEpoch(slot.epoch)
|
|
genesisValidatorsRoot = vc.beaconGenesis.genesis_validators_root
|
|
vindex = duty.validator_index
|
|
validator = vc.getValidatorForDuties(
|
|
duty.pubkey, slot, slashingSafe = true).valueOr: return false
|
|
|
|
logScope:
|
|
validator = validatorLog(validator)
|
|
block_root = shortLog(beaconBlockRoot)
|
|
slot = slot
|
|
|
|
let
|
|
message =
|
|
block:
|
|
let res = await getSyncCommitteeMessage(validator, fork,
|
|
genesisValidatorsRoot,
|
|
slot, beaconBlockRoot)
|
|
if res.isErr():
|
|
warn "Unable to sign committee message using remote signer"
|
|
return
|
|
res.get()
|
|
|
|
logScope:
|
|
message = shortLog(message)
|
|
|
|
debug "Sending sync committee message",
|
|
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
|
|
|
|
let res =
|
|
try:
|
|
await vc.submitPoolSyncCommitteeSignature(message, ApiStrategyKind.First)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to publish sync committee message",
|
|
reason = exc.getFailureReason()
|
|
return false
|
|
except CancelledError:
|
|
debug "Publish sync committee message request was interrupted"
|
|
return false
|
|
except CatchableError as exc:
|
|
error "Unexpected error occurred while publishing sync committee message",
|
|
error = exc.name, reason = exc.msg
|
|
return false
|
|
|
|
let
|
|
delay = vc.getDelay(message.slot.sync_committee_message_deadline())
|
|
dur = Moment.now() - startTime
|
|
|
|
if res:
|
|
beacon_sync_committee_messages_sent.inc()
|
|
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
|
|
notice "Sync committee message published",
|
|
validator_index = vindex, delay = delay, duration = dur
|
|
else:
|
|
warn "Sync committee message was not accepted by beacon node",
|
|
validator_index = vindex, delay = delay, duration = dur
|
|
res
|
|
|
|
proc produceAndPublishSyncCommitteeMessages(service: SyncCommitteeServiceRef,
|
|
slot: Slot,
|
|
beaconBlockRoot: Eth2Digest,
|
|
duties: seq[SyncCommitteeDuty])
|
|
{.async.} =
|
|
let
|
|
vc = service.client
|
|
startTime = Moment.now()
|
|
|
|
let pendingSyncCommitteeMessages =
|
|
block:
|
|
var res: seq[Future[bool]]
|
|
for duty in duties:
|
|
debug "Serving sync message duty", duty, epoch = slot.epoch()
|
|
res.add(service.serveSyncCommitteeMessage(slot,
|
|
beaconBlockRoot,
|
|
duty))
|
|
res
|
|
|
|
let statistics =
|
|
block:
|
|
var errored, succeed, failed = 0
|
|
try:
|
|
await allFutures(pendingSyncCommitteeMessages)
|
|
except CancelledError as exc:
|
|
let pending = pendingSyncCommitteeMessages
|
|
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await noCancel allFutures(pending)
|
|
raise exc
|
|
|
|
for future in pendingSyncCommitteeMessages:
|
|
if future.completed():
|
|
if future.read():
|
|
inc(succeed)
|
|
else:
|
|
inc(failed)
|
|
else:
|
|
inc(errored)
|
|
(succeed, errored, failed)
|
|
|
|
let
|
|
delay = vc.getDelay(slot.attestation_deadline())
|
|
dur = Moment.now() - startTime
|
|
|
|
debug "Sync committee message statistics",
|
|
total = len(pendingSyncCommitteeMessages),
|
|
succeed = statistics[0], failed_to_deliver = statistics[1],
|
|
not_accepted = statistics[2], delay = delay, duration = dur,
|
|
slot = slot, duties_count = len(duties)
|
|
|
|
proc serveContributionAndProof*(service: SyncCommitteeServiceRef,
|
|
proof: ContributionAndProof,
|
|
validator: AttachedValidator): Future[bool] {.
|
|
async.} =
|
|
## Signs ConributionAndProof object and sends it to BN.
|
|
let
|
|
vc = service.client
|
|
startTime = Moment.now()
|
|
slot = proof.contribution.slot
|
|
genesisRoot = vc.beaconGenesis.genesis_validators_root
|
|
fork = vc.forkAtEpoch(slot.epoch)
|
|
|
|
logScope:
|
|
validator = validatorLog(validator)
|
|
contribution = shortLog(proof.contribution)
|
|
|
|
let signature =
|
|
block:
|
|
let res =
|
|
try:
|
|
await validator.getContributionAndProofSignature(
|
|
fork, genesisRoot, proof)
|
|
except CancelledError:
|
|
debug "Sync contribution signing process was interrupted"
|
|
return false
|
|
except CatchableError as exc:
|
|
error "Unexpected error occurred while signing sync contribution",
|
|
error = exc.name, reason = exc.msg
|
|
return false
|
|
|
|
if res.isErr():
|
|
warn "Unable to sign sync committee contribution using remote signer",
|
|
reason = res.error()
|
|
return false
|
|
res.get()
|
|
|
|
debug "Sending sync contribution",
|
|
delay = vc.getDelay(slot.sync_contribution_deadline())
|
|
|
|
let restSignedProof = RestSignedContributionAndProof.init(
|
|
proof, signature)
|
|
|
|
let res =
|
|
try:
|
|
await vc.publishContributionAndProofs(@[restSignedProof],
|
|
ApiStrategyKind.First)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to publish sync contribution",
|
|
reason = exc.getFailureReason()
|
|
false
|
|
except CancelledError:
|
|
debug "Publication process of sync contribution was interrupted"
|
|
return false
|
|
except CatchableError as err:
|
|
error "Unexpected error occurred while publishing sync contribution",
|
|
error = err.name, reason = err.msg
|
|
false
|
|
|
|
let dur = Moment.now() - startTime
|
|
if res:
|
|
beacon_sync_committee_contributions_sent.inc()
|
|
notice "Sync contribution published", duration = dur
|
|
else:
|
|
warn "Sync contribution was not accepted by beacon node", duration = dur
|
|
res
|
|
|
|
proc produceAndPublishContributions(service: SyncCommitteeServiceRef,
|
|
slot: Slot,
|
|
beaconBlockRoot: Eth2Digest,
|
|
duties: seq[SyncCommitteeDuty]) {.async.} =
|
|
let
|
|
vc = service.client
|
|
startTime = Moment.now()
|
|
|
|
logScope:
|
|
slot = slot
|
|
block_root = shortLog(beaconBlockRoot)
|
|
|
|
var (contributions, pendingFutures, contributionsMap) =
|
|
block:
|
|
var
|
|
resItems: seq[ContributionItem]
|
|
resFutures: seq[FutureBase]
|
|
resMap: array[SYNC_COMMITTEE_SUBNET_COUNT,
|
|
Future[SyncCommitteeContribution]]
|
|
for duty in duties:
|
|
let validator = vc.getValidatorForDuties(duty.pubkey, slot).valueOr:
|
|
continue
|
|
if validator.index.isNone():
|
|
continue
|
|
for inindex in duty.validator_sync_committee_indices:
|
|
let
|
|
subCommitteeIdx = getSubcommitteeIndex(inindex)
|
|
signature =
|
|
vc.getSyncCommitteeSelectionProof(duty.pubkey,
|
|
slot, inindex).valueOr:
|
|
continue
|
|
|
|
if is_sync_committee_aggregator(signature):
|
|
resItems.add(ContributionItem(
|
|
aggregator_index: uint64(validator.index.get()),
|
|
selection_proof: signature,
|
|
validator: validator,
|
|
subcommitteeIdx: subCommitteeIdx
|
|
))
|
|
if isNil(resMap[subCommitteeIdx]):
|
|
let future =
|
|
vc.produceSyncCommitteeContribution(
|
|
slot, subCommitteeIdx, beaconBlockRoot, ApiStrategyKind.Best)
|
|
resMap[int(subCommitteeIdx)] = future
|
|
resFutures.add(FutureBase(future))
|
|
(resItems, resFutures, resMap)
|
|
|
|
if len(contributions) > 0:
|
|
let
|
|
pendingAggregates =
|
|
block:
|
|
var res: seq[Future[bool]]
|
|
while len(pendingFutures) > 0:
|
|
try:
|
|
discard await race(pendingFutures)
|
|
except CancelledError as exc:
|
|
let pending = pendingFutures
|
|
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await noCancel allFutures(pending)
|
|
raise exc
|
|
|
|
var completed: seq[int]
|
|
for contrib in contributions:
|
|
let future = contributionsMap[contrib.subcommitteeIdx]
|
|
doAssert(not(isNil(future)))
|
|
let index = pendingFutures.find(FutureBase(future))
|
|
if future.finished() and (index >= 0):
|
|
if index notin completed: completed.add(index)
|
|
let aggContribution =
|
|
try:
|
|
Opt.some(future.read())
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to get sync message contribution data",
|
|
reason = exc.getFailureReason()
|
|
Opt.none(SyncCommitteeContribution)
|
|
except CancelledError as exc:
|
|
debug "Request for sync message contribution was " &
|
|
"interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error occurred while getting sync " &
|
|
"message contribution",
|
|
error = exc.name, reason = exc.msg
|
|
Opt.none(SyncCommitteeContribution)
|
|
|
|
if aggContribution.isSome():
|
|
let proof = ContributionAndProof(
|
|
aggregator_index: contrib.aggregator_index,
|
|
contribution: aggContribution.get(),
|
|
selection_proof: contrib.selection_proof
|
|
)
|
|
res.add(
|
|
service.serveContributionAndProof(proof, contrib.validator))
|
|
|
|
pendingFutures =
|
|
block:
|
|
var res: seq[FutureBase]
|
|
for index, value in pendingFutures.pairs():
|
|
if index notin completed: res.add(value)
|
|
res
|
|
res
|
|
statistics =
|
|
block:
|
|
var errored, succeed, failed = 0
|
|
try:
|
|
await allFutures(pendingAggregates)
|
|
except CancelledError as exc:
|
|
let pending = pendingAggregates
|
|
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await noCancel allFutures(pending)
|
|
raise exc
|
|
|
|
for future in pendingAggregates:
|
|
if future.completed():
|
|
if future.read():
|
|
inc(succeed)
|
|
else:
|
|
inc(failed)
|
|
else:
|
|
inc(errored)
|
|
(succeed, errored, failed)
|
|
|
|
let
|
|
delay = vc.getDelay(slot.aggregate_deadline())
|
|
dur = Moment.now() - startTime
|
|
|
|
debug "Sync message contribution statistics",
|
|
total = len(contributions),
|
|
succeed = statistics[0],
|
|
failed_to_create = len(pendingAggregates) - len(contributions),
|
|
failed_to_deliver = statistics[1],
|
|
not_accepted = statistics[2],
|
|
delay = delay, duration = dur
|
|
|
|
else:
|
|
debug "No contribution and proofs scheduled for the slot"
|
|
|
|
proc publishSyncMessagesAndContributions(service: SyncCommitteeServiceRef,
|
|
slot: Slot,
|
|
duties: seq[SyncCommitteeDuty]) {.
|
|
async.} =
|
|
let vc = service.client
|
|
|
|
await vc.waitForBlock(slot, syncCommitteeMessageSlotOffset)
|
|
|
|
logScope:
|
|
slot = slot
|
|
|
|
block:
|
|
let delay = vc.getDelay(slot.sync_committee_message_deadline())
|
|
debug "Producing sync committee messages", delay = delay,
|
|
duties_count = len(duties)
|
|
|
|
let beaconBlockRoot =
|
|
block:
|
|
try:
|
|
let res = await vc.getHeadBlockRoot(ApiStrategyKind.Best)
|
|
if res.execution_optimistic.isNone():
|
|
## The `execution_optimistic` is missing from the response, we assume
|
|
## that the BN is unaware optimistic sync, so we consider the BN
|
|
## to be synchronized with the network.
|
|
## TODO (cheatfate): This should be removed when VC will be able to
|
|
## handle getSpec() API call with fork constants.
|
|
res.data.root
|
|
else:
|
|
if res.execution_optimistic.get():
|
|
notice "Execution client not in sync"
|
|
return
|
|
res.data.root
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to retrieve head block's root to sign", reason = exc.msg,
|
|
reason = exc.getFailureReason()
|
|
return
|
|
except CancelledError:
|
|
debug "Block root request was interrupted"
|
|
return
|
|
except CatchableError as exc:
|
|
error "Unexpected error while requesting sync message block root",
|
|
error = exc.name, reason = exc.msg
|
|
return
|
|
|
|
try:
|
|
await service.produceAndPublishSyncCommitteeMessages(
|
|
slot, beaconBlockRoot, duties)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to proceed sync committee messages",
|
|
duties_count = len(duties), reason = exc.getFailureReason()
|
|
return
|
|
except CancelledError:
|
|
debug "Sync committee messages production was interrupted"
|
|
return
|
|
except CatchableError as exc:
|
|
error "Unexpected error while producing sync committee messages",
|
|
duties_count = len(duties), error = exc.name, reason = exc.msg
|
|
return
|
|
|
|
let currentTime = vc.beaconClock.now()
|
|
if slot.sync_contribution_deadline() > currentTime:
|
|
let waitDur =
|
|
nanoseconds((slot.sync_contribution_deadline() - currentTime).nanoseconds)
|
|
# Sleeping until `sync_contribution_deadline`.
|
|
debug "Waiting for sync contribution deadline", wait_time = waitDur
|
|
await sleepAsync(waitDur)
|
|
|
|
block:
|
|
let delay = vc.getDelay(slot.sync_contribution_deadline())
|
|
debug "Producing contribution and proofs", delay = delay
|
|
|
|
try:
|
|
await service.produceAndPublishContributions(slot, beaconBlockRoot, duties)
|
|
except CancelledError:
|
|
debug "Sync committee contributions production was interrupted"
|
|
return
|
|
except CatchableError as exc:
|
|
error "Unexpected error while producing sync committee contributions",
|
|
duties_count = len(duties), error = exc.name, reason = exc.msg
|
|
return
|
|
|
|
proc processSyncCommitteeTasks(service: SyncCommitteeServiceRef,
|
|
slot: Slot) {.async.} =
|
|
let
|
|
vc = service.client
|
|
duties = vc.getSyncCommitteeDutiesForSlot(slot + 1)
|
|
timeout = vc.beaconClock.durationToNextSlot()
|
|
|
|
logScope:
|
|
slot = slot
|
|
|
|
try:
|
|
await service.publishSyncMessagesAndContributions(slot,
|
|
duties).wait(timeout)
|
|
except AsyncTimeoutError:
|
|
warn "Unable to publish sync committee messages and contributions in time",
|
|
timeout = timeout
|
|
except CancelledError as exc:
|
|
debug "Sync committee publish task has been interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error encountered while processing sync committee tasks",
|
|
error_name = exc.name, error_message = exc.msg
|
|
|
|
proc mainLoop(service: SyncCommitteeServiceRef) {.async.} =
|
|
let vc = service.client
|
|
service.state = ServiceState.Running
|
|
debug "Service started"
|
|
|
|
debug "Sync committee processing loop is waiting for initialization"
|
|
try:
|
|
await allFutures(
|
|
vc.preGenesisEvent.wait(),
|
|
vc.genesisEvent.wait(),
|
|
vc.indicesAvailable.wait(),
|
|
vc.forksAvailable.wait()
|
|
)
|
|
except CancelledError:
|
|
debug "Service interrupted"
|
|
return
|
|
except CatchableError as exc:
|
|
warn "Service crashed with unexpected error", error = exc.name,
|
|
reason = exc.msg
|
|
return
|
|
|
|
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
|
|
|
|
var currentSlot: Opt[Slot]
|
|
while true:
|
|
# This loop could look much more nicer/better, when
|
|
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
|
|
# become safe to combine loops, breaks and exception handlers.
|
|
let breakLoop =
|
|
try:
|
|
let
|
|
# We use zero offset here, because we do waiting in
|
|
# waitForBlock(syncCommitteeMessageSlotOffset).
|
|
slot = await vc.checkedWaitForNextSlot(currentSlot, ZeroTimeDiff,
|
|
false)
|
|
if slot.isNone():
|
|
debug "System time adjusted backwards significantly, exiting"
|
|
true
|
|
else:
|
|
currentSlot = slot
|
|
await service.processSyncCommitteeTasks(currentSlot.get())
|
|
false
|
|
except CancelledError:
|
|
debug "Service interrupted"
|
|
true
|
|
except CatchableError as exc:
|
|
warn "Service crashed with unexpected error", error = exc.name,
|
|
reason = exc.msg
|
|
true
|
|
|
|
if breakLoop:
|
|
break
|
|
|
|
proc init*(t: typedesc[SyncCommitteeServiceRef],
|
|
vc: ValidatorClientRef): Future[SyncCommitteeServiceRef] {.async.} =
|
|
logScope: service = ServiceName
|
|
let res = SyncCommitteeServiceRef(name: ServiceName, client: vc,
|
|
state: ServiceState.Initialized)
|
|
debug "Initializing service"
|
|
res
|
|
|
|
proc start*(service: SyncCommitteeServiceRef) =
|
|
service.lifeFut = mainLoop(service)
|