477 lines
17 KiB
Nim
477 lines
17 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2021-2024 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
|
|
import
|
|
std/sets,
|
|
chronicles,
|
|
../validators/[activity_metrics, validator_duties],
|
|
"."/[common, api]
|
|
|
|
const
|
|
ServiceName = "attestation_service"
|
|
|
|
logScope: service = ServiceName
|
|
|
|
type
|
|
AggregateItem* = object
|
|
aggregator_index: uint64
|
|
selection_proof: ValidatorSig
|
|
validator: AttachedValidator
|
|
|
|
proc serveAttestation(
|
|
service: AttestationServiceRef, registered: RegisteredAttestation):
|
|
Future[bool] {.async.} =
|
|
let
|
|
vc = service.client
|
|
fork = vc.forkAtEpoch(registered.data.slot.epoch)
|
|
validator = registered.validator
|
|
|
|
logScope:
|
|
validator = validatorLog(validator)
|
|
|
|
let attestation = block:
|
|
let signature =
|
|
try:
|
|
let res = await validator.getAttestationSignature(
|
|
fork, vc.beaconGenesis.genesis_validators_root, registered.data)
|
|
if res.isErr():
|
|
warn "Unable to sign attestation", reason = res.error()
|
|
return false
|
|
res.get()
|
|
except CancelledError as exc:
|
|
debug "Attestation signature process was interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "An unexpected error occurred while signing attestation",
|
|
err_name = exc.name, err_msg = exc.msg
|
|
return false
|
|
registered.toAttestation(signature)
|
|
|
|
logScope:
|
|
attestation = shortLog(attestation)
|
|
delay = vc.getDelay(registered.data.slot.attestation_deadline())
|
|
|
|
debug "Sending attestation"
|
|
|
|
validator.doppelgangerActivity(attestation.data.slot.epoch)
|
|
|
|
let res =
|
|
try:
|
|
await vc.submitPoolAttestations(@[attestation], ApiStrategyKind.First)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to publish attestation", reason = exc.getFailureReason()
|
|
return false
|
|
except CancelledError as exc:
|
|
debug "Attestation publishing process was interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error occured while publishing attestation",
|
|
err_name = exc.name, err_msg = exc.msg
|
|
return false
|
|
|
|
if res:
|
|
let delay = vc.getDelay(attestation.data.slot.attestation_deadline())
|
|
beacon_attestations_sent.inc()
|
|
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
|
|
notice "Attestation published"
|
|
else:
|
|
warn "Attestation was not accepted by beacon node"
|
|
return res
|
|
|
|
proc serveAggregateAndProof*(service: AttestationServiceRef,
|
|
proof: AggregateAndProof,
|
|
validator: AttachedValidator): Future[bool] {.
|
|
async.} =
|
|
let
|
|
vc = service.client
|
|
genesisRoot = vc.beaconGenesis.genesis_validators_root
|
|
slot = proof.aggregate.data.slot
|
|
fork = vc.forkAtEpoch(slot.epoch)
|
|
|
|
logScope:
|
|
validator = validatorLog(validator)
|
|
attestation = shortLog(proof.aggregate)
|
|
|
|
debug "Signing aggregate", fork = fork
|
|
|
|
let signature =
|
|
try:
|
|
let res =
|
|
await validator.getAggregateAndProofSignature(fork, genesisRoot, proof)
|
|
if res.isErr():
|
|
warn "Unable to sign aggregate and proof using remote signer",
|
|
reason = res.error()
|
|
return false
|
|
res.get()
|
|
except CancelledError as exc:
|
|
debug "Aggregated attestation signing process was interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error occured while signing aggregated attestation",
|
|
err_name = exc.name, err_msg = exc.msg
|
|
return false
|
|
|
|
let signedProof = SignedAggregateAndProof(message: proof,
|
|
signature: signature)
|
|
logScope:
|
|
delay = vc.getDelay(slot.aggregate_deadline())
|
|
|
|
debug "Sending aggregated attestation", fork = fork
|
|
|
|
validator.doppelgangerActivity(proof.aggregate.data.slot.epoch)
|
|
|
|
let res =
|
|
try:
|
|
await vc.publishAggregateAndProofs(@[signedProof], ApiStrategyKind.First)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to publish aggregated attestation",
|
|
reason = exc.getFailureReason()
|
|
return false
|
|
except CancelledError as exc:
|
|
debug "Publish aggregate and proofs request was interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error occured while publishing aggregated attestation",
|
|
err_name = exc.name, err_msg = exc.msg
|
|
return false
|
|
|
|
if res:
|
|
beacon_aggregates_sent.inc()
|
|
notice "Aggregated attestation published"
|
|
else:
|
|
warn "Aggregated attestation was not accepted by beacon node"
|
|
return res
|
|
|
|
proc produceAndPublishAttestations*(service: AttestationServiceRef,
|
|
slot: Slot, committee_index: CommitteeIndex,
|
|
duties: seq[DutyAndProof]
|
|
): Future[AttestationData] {.
|
|
async.} =
|
|
doAssert(MAX_VALIDATORS_PER_COMMITTEE <= uint64(high(int)))
|
|
let
|
|
vc = service.client
|
|
fork = vc.forkAtEpoch(slot.epoch)
|
|
|
|
# This call could raise ValidatorApiError, but it is handled in
|
|
# publishAttestationsAndAggregates().
|
|
let data = await vc.produceAttestationData(slot, committee_index,
|
|
ApiStrategyKind.Best)
|
|
|
|
let registeredRes = vc.attachedValidators[].slashingProtection.withContext:
|
|
var tmp: seq[RegisteredAttestation]
|
|
for duty in duties:
|
|
if (duty.data.slot != data.slot) or
|
|
(uint64(duty.data.committee_index) != data.index):
|
|
warn "Inconsistent validator duties during attestation signing",
|
|
validator = shortLog(duty.data.pubkey),
|
|
duty_slot = duty.data.slot,
|
|
duty_index = duty.data.committee_index,
|
|
attestation_slot = data.slot, attestation_index = data.index
|
|
continue
|
|
|
|
let validator = vc.getValidatorForDuties(
|
|
duty.data.pubkey, duty.data.slot).valueOr:
|
|
continue
|
|
|
|
doAssert(validator.index.isSome())
|
|
let validator_index = validator.index.get()
|
|
|
|
logScope:
|
|
validator = validatorLog(validator)
|
|
|
|
# TODO: signing_root is recomputed in getAttestationSignature just after,
|
|
# but not for locally attached validators.
|
|
let
|
|
signingRoot = compute_attestation_signing_root(
|
|
fork, vc.beaconGenesis.genesis_validators_root, data)
|
|
registered = registerAttestationInContext(
|
|
validator_index, validator.pubkey, data.source.epoch,
|
|
data.target.epoch, signingRoot)
|
|
if registered.isErr():
|
|
warn "Slashing protection activated for attestation",
|
|
attestationData = shortLog(data),
|
|
signingRoot = shortLog(signingRoot),
|
|
badVoteDetails = $registered.error()
|
|
continue
|
|
|
|
tmp.add(RegisteredAttestation(
|
|
validator: validator,
|
|
index_in_committee: duty.data.validator_committee_index,
|
|
committee_len: int duty.data.committee_length,
|
|
data: data
|
|
))
|
|
tmp
|
|
|
|
if registeredRes.isErr():
|
|
warn "Could not update slashing database, skipping attestation duties",
|
|
error = registeredRes.error()
|
|
else:
|
|
let
|
|
pendingAttestations = registeredRes[].mapIt(service.serveAttestation(it))
|
|
statistics =
|
|
block:
|
|
var errored, succeed, failed = 0
|
|
try:
|
|
await allFutures(pendingAttestations)
|
|
except CancelledError as exc:
|
|
let pending = pendingAttestations
|
|
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await noCancel allFutures(pending)
|
|
raise exc
|
|
|
|
for future in pendingAttestations:
|
|
if future.completed():
|
|
if future.read():
|
|
inc(succeed)
|
|
else:
|
|
inc(failed)
|
|
else:
|
|
inc(errored)
|
|
(succeed, errored, failed)
|
|
|
|
let delay = vc.getDelay(slot.attestation_deadline())
|
|
debug "Attestation statistics", total = len(pendingAttestations),
|
|
succeed = statistics[0], failed_to_deliver = statistics[1],
|
|
not_accepted = statistics[2], delay = delay, slot = slot,
|
|
committee_index = committee_index, duties_count = len(duties)
|
|
|
|
return data
|
|
|
|
proc produceAndPublishAggregates(service: AttestationServiceRef,
|
|
adata: AttestationData,
|
|
duties: seq[DutyAndProof]) {.async.} =
|
|
let
|
|
vc = service.client
|
|
slot = adata.slot
|
|
committeeIndex = adata.index
|
|
attestationRoot = adata.hash_tree_root()
|
|
|
|
let aggregateItems =
|
|
block:
|
|
var res: seq[AggregateItem]
|
|
for duty in duties:
|
|
let validator = vc.getValidatorForDuties(
|
|
duty.data.pubkey, slot).valueOr:
|
|
continue
|
|
|
|
if (duty.data.slot != slot) or
|
|
(duty.data.committee_index != committeeIndex):
|
|
warn "Inconsistent validator duties during aggregate signing",
|
|
duty_slot = duty.data.slot, slot = slot,
|
|
duty_committee_index = duty.data.committee_index,
|
|
committee_index = committeeIndex
|
|
continue
|
|
if duty.slotSig.isSome():
|
|
let slotSignature = duty.slotSig.get()
|
|
if is_aggregator(duty.data.committee_length, slotSignature):
|
|
res.add(AggregateItem(
|
|
aggregator_index: uint64(duty.data.validator_index),
|
|
selection_proof: slotSignature,
|
|
validator: validator
|
|
))
|
|
res
|
|
|
|
if len(aggregateItems) > 0:
|
|
let aggAttestation =
|
|
try:
|
|
await vc.getAggregatedAttestation(slot, attestationRoot,
|
|
ApiStrategyKind.Best)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to get aggregated attestation data", slot = slot,
|
|
attestation_root = shortLog(attestationRoot),
|
|
reason = exc.getFailureReason()
|
|
return
|
|
except CancelledError as exc:
|
|
debug "Aggregated attestation request was interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error occured while getting aggregated attestation",
|
|
slot = slot, attestation_root = shortLog(attestationRoot),
|
|
err_name = exc.name, err_msg = exc.msg
|
|
return
|
|
|
|
let pendingAggregates =
|
|
block:
|
|
var res: seq[Future[bool]]
|
|
for item in aggregateItems:
|
|
let proof = AggregateAndProof(
|
|
aggregator_index: item.aggregator_index,
|
|
aggregate: aggAttestation,
|
|
selection_proof: item.selection_proof
|
|
)
|
|
res.add(service.serveAggregateAndProof(proof, item.validator))
|
|
res
|
|
|
|
let statistics =
|
|
block:
|
|
var errored, succeed, failed = 0
|
|
try:
|
|
await allFutures(pendingAggregates)
|
|
except CancelledError as exc:
|
|
let pending = pendingAggregates
|
|
.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await noCancel allFutures(pending)
|
|
raise exc
|
|
|
|
for future in pendingAggregates:
|
|
if future.completed():
|
|
if future.read():
|
|
inc(succeed)
|
|
else:
|
|
inc(failed)
|
|
else:
|
|
inc(errored)
|
|
(succeed, errored, failed)
|
|
|
|
let delay = vc.getDelay(slot.aggregate_deadline())
|
|
debug "Aggregated attestation statistics", total = len(pendingAggregates),
|
|
succeed = statistics[0], failed_to_deliver = statistics[1],
|
|
not_accepted = statistics[2], delay = delay, slot = slot,
|
|
committee_index = committeeIndex
|
|
|
|
else:
|
|
debug "No aggregate and proofs scheduled for slot", slot = slot,
|
|
committee_index = committeeIndex
|
|
|
|
proc publishAttestationsAndAggregates(service: AttestationServiceRef,
|
|
slot: Slot,
|
|
committee_index: CommitteeIndex,
|
|
duties: seq[DutyAndProof]) {.async.} =
|
|
let vc = service.client
|
|
|
|
block:
|
|
let delay = vc.getDelay(slot.attestation_deadline())
|
|
debug "Producing attestations", delay = delay, slot = slot,
|
|
committee_index = committee_index,
|
|
duties_count = len(duties)
|
|
let ad =
|
|
try:
|
|
await service.produceAndPublishAttestations(slot, committee_index, duties)
|
|
except ValidatorApiError as exc:
|
|
warn "Unable to proceed attestations", slot = slot,
|
|
committee_index = committee_index, duties_count = len(duties),
|
|
reason = exc.getFailureReason()
|
|
return
|
|
except CancelledError as exc:
|
|
debug "Publish attestation request was interrupted"
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error while producing attestations", slot = slot,
|
|
committee_index = committee_index, duties_count = len(duties),
|
|
err_name = exc.name, err_msg = exc.msg
|
|
return
|
|
|
|
let aggregateTime =
|
|
# chronos.Duration substraction could not return negative value, in such
|
|
# case it will return `ZeroDuration`.
|
|
vc.beaconClock.durationToNextSlot() - OneThirdDuration
|
|
if aggregateTime != ZeroDuration:
|
|
await sleepAsync(aggregateTime)
|
|
|
|
block:
|
|
let delay = vc.getDelay(slot.aggregate_deadline())
|
|
debug "Producing aggregate and proofs", delay = delay
|
|
await service.produceAndPublishAggregates(ad, duties)
|
|
|
|
proc spawnAttestationTasks(service: AttestationServiceRef,
|
|
slot: Slot) {.async.} =
|
|
let vc = service.client
|
|
let dutiesByCommittee =
|
|
block:
|
|
var res: Table[CommitteeIndex, seq[DutyAndProof]]
|
|
let attesters = vc.getAttesterDutiesForSlot(slot)
|
|
var default: seq[DutyAndProof]
|
|
for item in attesters:
|
|
res.mgetOrPut(item.data.committee_index, default).add(item)
|
|
res
|
|
|
|
# Waiting for blocks to be published before attesting.
|
|
await vc.waitForBlock(slot, attestationSlotOffset)
|
|
|
|
var tasks: seq[Future[void]]
|
|
try:
|
|
for index, duties in dutiesByCommittee:
|
|
tasks.add(service.publishAttestationsAndAggregates(slot, index, duties))
|
|
let timeout = vc.beaconClock.durationToNextSlot()
|
|
await allFutures(tasks).wait(timeout)
|
|
except AsyncTimeoutError:
|
|
# Cancelling all the pending tasks.
|
|
let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await allFutures(pending)
|
|
except CancelledError as exc:
|
|
# Cancelling all the pending tasks.
|
|
let pending = tasks.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
|
|
await noCancel allFutures(pending)
|
|
raise exc
|
|
except CatchableError as exc:
|
|
error "Unexpected error while processing attestation duties",
|
|
error_name = exc.name, error_message = exc.msg
|
|
|
|
proc mainLoop(service: AttestationServiceRef) {.async.} =
|
|
let vc = service.client
|
|
service.state = ServiceState.Running
|
|
debug "Service started"
|
|
|
|
debug "Attester loop is waiting for initialization"
|
|
try:
|
|
await allFutures(
|
|
vc.preGenesisEvent.wait(),
|
|
vc.genesisEvent.wait(),
|
|
vc.indicesAvailable.wait(),
|
|
vc.forksAvailable.wait()
|
|
)
|
|
except CancelledError:
|
|
debug "Service interrupted"
|
|
return
|
|
except CatchableError as exc:
|
|
warn "Service crashed with unexpected error", err_name = exc.name,
|
|
err_msg = exc.msg
|
|
return
|
|
|
|
doAssert(len(vc.forks) > 0, "Fork schedule must not be empty at this point")
|
|
|
|
var currentSlot: Opt[Slot]
|
|
while true:
|
|
# This loop could look much more nicer/better, when
|
|
# https://github.com/nim-lang/Nim/issues/19911 will be fixed, so it could
|
|
# become safe to combine loops, breaks and exception handlers.
|
|
let breakLoop =
|
|
try:
|
|
let
|
|
# We use zero offset here, because we do waiting in
|
|
# waitForBlock(attestationSlotOffset).
|
|
slot = await vc.checkedWaitForNextSlot(currentSlot,
|
|
ZeroTimeDiff, false)
|
|
if slot.isNone():
|
|
debug "System time adjusted backwards significantly, exiting"
|
|
true
|
|
else:
|
|
currentSlot = slot
|
|
await service.spawnAttestationTasks(currentSlot.get())
|
|
false
|
|
except CancelledError:
|
|
debug "Service interrupted"
|
|
true
|
|
except CatchableError as exc:
|
|
warn "Service crashed with unexpected error", err_name = exc.name,
|
|
err_msg = exc.msg
|
|
true
|
|
|
|
if breakLoop:
|
|
break
|
|
|
|
proc init*(t: typedesc[AttestationServiceRef],
|
|
vc: ValidatorClientRef): Future[AttestationServiceRef] {.async.} =
|
|
logScope: service = ServiceName
|
|
let res = AttestationServiceRef(name: ServiceName,
|
|
client: vc, state: ServiceState.Initialized)
|
|
debug "Initializing service"
|
|
return res
|
|
|
|
proc start*(service: AttestationServiceRef) =
|
|
service.lifeFut = mainLoop(service)
|