Fix and optimize aggregate and proofs generation algorithm. (#2722)
This commit is contained in:
parent
e4afc36d71
commit
754aeec2b0
|
@ -4,10 +4,16 @@ import common, api, block_service
|
|||
|
||||
logScope: service = "attestation_service"
|
||||
|
||||
type
|
||||
AggregateItem* = object
|
||||
aggregator_index: uint64
|
||||
selection_proof: ValidatorSig
|
||||
validator: AttachedValidator
|
||||
|
||||
proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
||||
duty: RestAttesterDuty): Future[bool] {.async.} =
|
||||
duty: DutyAndProof): Future[bool] {.async.} =
|
||||
let vc = service.client
|
||||
let validator = vc.attachedValidators.getValidator(duty.pubkey)
|
||||
let validator = vc.attachedValidators.getValidator(duty.data.pubkey)
|
||||
if validator.index.isNone():
|
||||
warn "Validator index is missing", validator = validator.pubKey
|
||||
return false
|
||||
|
@ -26,38 +32,65 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData,
|
|||
adata.source.epoch,
|
||||
adata.target.epoch, signingRoot)
|
||||
if notSlashable.isErr():
|
||||
warn "Slashing protection activated for attestation", slot = duty.slot,
|
||||
validator = validator.pubKey, validator_index = duty.validator_index,
|
||||
warn "Slashing protection activated for attestation", slot = duty.data.slot,
|
||||
validator = validator.pubKey,
|
||||
validator_index = duty.data.validator_index,
|
||||
badVoteDetails = $notSlashable.error
|
||||
return false
|
||||
|
||||
let attestation = await validator.produceAndSignAttestation(adata,
|
||||
int(duty.committee_length), Natural(duty.validator_committee_index),
|
||||
int(duty.data.committee_length),
|
||||
Natural(duty.data.validator_committee_index),
|
||||
fork, vc.beaconGenesis.genesis_validators_root)
|
||||
|
||||
let res =
|
||||
try:
|
||||
await vc.submitPoolAttestations(@[attestation])
|
||||
except ValidatorApiError as exc:
|
||||
error "Unable to submit attestation", slot = duty.slot,
|
||||
validator = validator.pubKey, validator_index = duty.validator_index
|
||||
error "Unable to submit attestation", slot = duty.data.slot,
|
||||
validator = validator.pubKey,
|
||||
validator_index = duty.data.validator_index
|
||||
raise exc
|
||||
|
||||
let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3))
|
||||
if res:
|
||||
notice "Attestation published", validator = validator.pubKey,
|
||||
validator_index = duty.validator_index, slot = duty.slot,
|
||||
validator_index = duty.data.validator_index, slot = duty.data.slot,
|
||||
delay = delay
|
||||
return true
|
||||
else:
|
||||
warn "Attestation was not accepted by beacon node",
|
||||
validator = validator.pubKey, validator_index = duty.validator_index,
|
||||
slot = duty.slot, delay = delay
|
||||
validator = validator.pubKey,
|
||||
validator_index = duty.data.validator_index,
|
||||
slot = duty.data.slot, delay = delay
|
||||
return false
|
||||
|
||||
proc serveAggregateAndProof*(service: AttestationServiceRef,
|
||||
proof: AggregateAndProof,
|
||||
validator: AttachedValidator): Future[bool] {.
|
||||
async.} =
|
||||
let
|
||||
vc = service.client
|
||||
genesisRoot = vc.beaconGenesis.genesis_validators_root
|
||||
fork = vc.fork.get()
|
||||
|
||||
let signature = await signAggregateAndProof(validator, proof, fork,
|
||||
genesisRoot)
|
||||
let signedProof = SignedAggregateAndProof(message: proof,
|
||||
signature: signature)
|
||||
try:
|
||||
return await vc.publishAggregateAndProofs(@[signedProof]):
|
||||
except ValidatorApiError:
|
||||
warn "Unable to publish aggregate and proofs"
|
||||
return false
|
||||
except CatchableError as exc:
|
||||
error "Unexpected error happened", err_name = exc.name,
|
||||
err_msg = exc.msg
|
||||
return false
|
||||
|
||||
proc produceAndPublishAttestations*(service: AttestationServiceRef,
|
||||
slot: Slot, committee_index: CommitteeIndex,
|
||||
duties: seq[RestAttesterDuty]
|
||||
duties: seq[DutyAndProof]
|
||||
): Future[AttestationData] {.
|
||||
async.} =
|
||||
doAssert(MAX_VALIDATORS_PER_COMMITTEE <= uint64(high(int)))
|
||||
|
@ -71,12 +104,12 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
|
|||
block:
|
||||
var res: seq[Future[bool]]
|
||||
for duty in duties:
|
||||
debug "Serving attestation duty", duty = duty, epoch = slot.epoch()
|
||||
if (duty.slot != ad.slot) or
|
||||
(uint64(duty.committee_index) != ad.index):
|
||||
debug "Serving attestation duty", duty = duty.data, epoch = slot.epoch()
|
||||
if (duty.data.slot != ad.slot) or
|
||||
(uint64(duty.data.committee_index) != ad.index):
|
||||
error "Inconsistent validator duties during attestation signing",
|
||||
validator = duty.pubkey, duty_slot = duty.slot,
|
||||
duty_index = duty.committee_index,
|
||||
validator = duty.data.pubkey, duty_slot = duty.data.slot,
|
||||
duty_index = duty.data.committee_index,
|
||||
attestation_slot = ad.slot, attestation_index = ad.index
|
||||
continue
|
||||
res.add(service.serveAttestation(ad, duty))
|
||||
|
@ -113,7 +146,7 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef,
|
|||
|
||||
proc produceAndPublishAggregates(service: AttestationServiceRef,
|
||||
adata: AttestationData,
|
||||
duties: seq[RestAttesterDuty]) {.async.} =
|
||||
duties: seq[DutyAndProof]) {.async.} =
|
||||
let
|
||||
vc = service.client
|
||||
slot = adata.slot
|
||||
|
@ -121,61 +154,84 @@ proc produceAndPublishAggregates(service: AttestationServiceRef,
|
|||
attestationRoot = adata.hash_tree_root()
|
||||
genesisRoot = vc.beaconGenesis.genesis_validators_root
|
||||
|
||||
let aggAttestation =
|
||||
try:
|
||||
await vc.getAggregatedAttestation(slot, attestationRoot)
|
||||
except ValidatorApiError:
|
||||
error "Unable to retrieve aggregated attestation data"
|
||||
return
|
||||
|
||||
let aggregateAndProofs =
|
||||
let aggregateItems =
|
||||
block:
|
||||
var res: seq[SignedAggregateAndProof]
|
||||
var res: seq[AggregateItem]
|
||||
for duty in duties:
|
||||
let validator = vc.attachedValidators.getValidator(duty.pubkey)
|
||||
let slotSignature = await getSlotSig(validator, vc.fork.get(),
|
||||
genesisRoot, slot)
|
||||
if (duty.slot != slot) or (duty.committee_index != committeeIndex):
|
||||
error "Inconsistent validator duties during aggregate signing",
|
||||
duty_slot = duty.slot, slot = slot,
|
||||
duty_committee_index = duty.committee_index,
|
||||
committee_index = committeeIndex
|
||||
continue
|
||||
|
||||
if is_aggregator(duty.committee_length, slotSignature):
|
||||
notice "Aggregating", slot = slot, validator = duty.pubkey
|
||||
|
||||
let aggAndProof = AggregateAndProof(
|
||||
aggregator_index: uint64(duty.validator_index),
|
||||
aggregate: aggAttestation,
|
||||
selection_proof: slot_signature
|
||||
)
|
||||
let signature = await signAggregateAndProof(validator, aggAndProof,
|
||||
vc.fork.get(),
|
||||
genesisRoot)
|
||||
res.add(SignedAggregateAndProof(message: aggAndProof,
|
||||
signature: signature))
|
||||
let validator = vc.attachedValidators.getValidator(duty.data.pubkey)
|
||||
if not(isNil(validator)):
|
||||
if (duty.data.slot != slot) or
|
||||
(duty.data.committee_index != committeeIndex):
|
||||
error "Inconsistent validator duties during aggregate signing",
|
||||
duty_slot = duty.data.slot, slot = slot,
|
||||
duty_committee_index = duty.data.committee_index,
|
||||
committee_index = committeeIndex
|
||||
continue
|
||||
if duty.slotSig.isSome():
|
||||
let slotSignature = duty.slotSig.get()
|
||||
if is_aggregator(duty.data.committee_length, slotSignature):
|
||||
res.add(AggregateItem(
|
||||
aggregator_index: uint64(duty.data.validator_index),
|
||||
selection_proof: slotSignature,
|
||||
validator: validator
|
||||
))
|
||||
res
|
||||
|
||||
let count = len(aggregateAndProofs)
|
||||
if count > 0:
|
||||
let res =
|
||||
if len(aggregateItems) > 0:
|
||||
let aggAttestation =
|
||||
try:
|
||||
await vc.publishAggregateAndProofs(aggregateAndProofs)
|
||||
await vc.getAggregatedAttestation(slot, attestationRoot)
|
||||
except ValidatorApiError:
|
||||
warn "Unable to publish aggregate and proofs"
|
||||
error "Unable to retrieve aggregated attestation data"
|
||||
return
|
||||
if res:
|
||||
notice "Successfully published aggregate and proofs", count = count
|
||||
else:
|
||||
warn "Aggregate and proofs not accepted by beacon node", count = count
|
||||
|
||||
let pendingAggregates =
|
||||
block:
|
||||
var res: seq[Future[bool]]
|
||||
for item in aggregateItems:
|
||||
let proof = AggregateAndProof(
|
||||
aggregator_index: item.aggregator_index,
|
||||
aggregate: aggAttestation,
|
||||
selection_proof: item.selection_proof
|
||||
)
|
||||
res.add(service.serveAggregateAndProof(proof, item.validator))
|
||||
res
|
||||
|
||||
let statistics =
|
||||
block:
|
||||
var errored, succeed, failed = 0
|
||||
try:
|
||||
await allFutures(pendingAggregates)
|
||||
except CancelledError:
|
||||
for fut in pendingAggregates:
|
||||
if not(fut.finished()):
|
||||
fut.cancel()
|
||||
await allFutures(pendingAggregates)
|
||||
|
||||
for future in pendingAggregates:
|
||||
if future.done():
|
||||
if future.read():
|
||||
inc(succeed)
|
||||
else:
|
||||
inc(failed)
|
||||
else:
|
||||
inc(errored)
|
||||
(succeed, errored, failed)
|
||||
|
||||
let delay = vc.getDelay(seconds((int64(SECONDS_PER_SLOT) div 3) * 2))
|
||||
debug "Aggregate attestation statistics", total = len(pendingAggregates),
|
||||
succeed = statistics[0], failed_to_deliver = statistics[1],
|
||||
not_accepted = statistics[2], delay = delay, slot = slot,
|
||||
committee_index = committeeIndex
|
||||
|
||||
else:
|
||||
warn "No aggregate and proofs produced"
|
||||
notice "No aggregate and proofs scheduled for slot", slot = slot,
|
||||
committee_index = committeeIndex
|
||||
|
||||
proc publishAttestationsAndAggregates(service: AttestationServiceRef,
|
||||
slot: Slot,
|
||||
committee_index: CommitteeIndex,
|
||||
duties: seq[RestAttesterDuty]) {.async.} =
|
||||
duties: seq[DutyAndProof]) {.async.} =
|
||||
let vc = service.client
|
||||
let aggregateTime =
|
||||
# chronos.Duration substraction could not return negative value, in such
|
||||
|
@ -217,11 +273,11 @@ proc spawnAttestationTasks(service: AttestationServiceRef,
|
|||
let vc = service.client
|
||||
let dutiesByCommittee =
|
||||
block:
|
||||
var res: Table[CommitteeIndex, seq[RestAttesterDuty]]
|
||||
var res: Table[CommitteeIndex, seq[DutyAndProof]]
|
||||
let attesters = vc.getAttesterDutiesForSlot(slot)
|
||||
var default: seq[RestAttesterDuty]
|
||||
var default: seq[DutyAndProof]
|
||||
for item in attesters:
|
||||
res.mgetOrPut(item.committee_index, default).add(item)
|
||||
res.mgetOrPut(item.data.committee_index, default).add(item)
|
||||
res
|
||||
for index, duties in dutiesByCommittee.pairs():
|
||||
if len(duties) > 0:
|
||||
|
|
|
@ -178,15 +178,15 @@ proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] =
|
|||
some(wallSlot.slot)
|
||||
|
||||
proc getAttesterDutiesForSlot*(vc: ValidatorClientRef,
|
||||
slot: Slot): seq[RestAttesterDuty] =
|
||||
slot: Slot): seq[DutyAndProof] =
|
||||
## Returns all `DutyAndPrrof` for the given `slot`.
|
||||
var res: seq[RestAttesterDuty]
|
||||
var res: seq[DutyAndProof]
|
||||
let epoch = slot.epoch()
|
||||
for key, item in vc.attesters.pairs():
|
||||
let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof)
|
||||
if not(duty.isDefault()):
|
||||
if duty.data.slot == slot:
|
||||
res.add(duty.data)
|
||||
res.add(duty)
|
||||
res
|
||||
|
||||
proc getDurationToNextAttestation*(vc: ValidatorClientRef,
|
||||
|
|
Loading…
Reference in New Issue