diff --git a/beacon_chain/validator_client/attestation_service.nim b/beacon_chain/validator_client/attestation_service.nim index 45737681c..1d503bf8f 100644 --- a/beacon_chain/validator_client/attestation_service.nim +++ b/beacon_chain/validator_client/attestation_service.nim @@ -4,10 +4,16 @@ import common, api, block_service logScope: service = "attestation_service" +type + AggregateItem* = object + aggregator_index: uint64 + selection_proof: ValidatorSig + validator: AttachedValidator + proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, - duty: RestAttesterDuty): Future[bool] {.async.} = + duty: DutyAndProof): Future[bool] {.async.} = let vc = service.client - let validator = vc.attachedValidators.getValidator(duty.pubkey) + let validator = vc.attachedValidators.getValidator(duty.data.pubkey) if validator.index.isNone(): warn "Validator index is missing", validator = validator.pubKey return false @@ -26,38 +32,65 @@ proc serveAttestation(service: AttestationServiceRef, adata: AttestationData, adata.source.epoch, adata.target.epoch, signingRoot) if notSlashable.isErr(): - warn "Slashing protection activated for attestation", slot = duty.slot, - validator = validator.pubKey, validator_index = duty.validator_index, + warn "Slashing protection activated for attestation", slot = duty.data.slot, + validator = validator.pubKey, + validator_index = duty.data.validator_index, badVoteDetails = $notSlashable.error return false let attestation = await validator.produceAndSignAttestation(adata, - int(duty.committee_length), Natural(duty.validator_committee_index), + int(duty.data.committee_length), + Natural(duty.data.validator_committee_index), fork, vc.beaconGenesis.genesis_validators_root) let res = try: await vc.submitPoolAttestations(@[attestation]) except ValidatorApiError as exc: - error "Unable to submit attestation", slot = duty.slot, - validator = validator.pubKey, validator_index = duty.validator_index + error "Unable to submit attestation", slot = duty.data.slot, + validator = validator.pubKey, + validator_index = duty.data.validator_index raise exc let delay = vc.getDelay(seconds(int64(SECONDS_PER_SLOT) div 3)) if res: notice "Attestation published", validator = validator.pubKey, - validator_index = duty.validator_index, slot = duty.slot, + validator_index = duty.data.validator_index, slot = duty.data.slot, delay = delay return true else: warn "Attestation was not accepted by beacon node", - validator = validator.pubKey, validator_index = duty.validator_index, - slot = duty.slot, delay = delay + validator = validator.pubKey, + validator_index = duty.data.validator_index, + slot = duty.data.slot, delay = delay + return false + +proc serveAggregateAndProof*(service: AttestationServiceRef, + proof: AggregateAndProof, + validator: AttachedValidator): Future[bool] {. + async.} = + let + vc = service.client + genesisRoot = vc.beaconGenesis.genesis_validators_root + fork = vc.fork.get() + + let signature = await signAggregateAndProof(validator, proof, fork, + genesisRoot) + let signedProof = SignedAggregateAndProof(message: proof, + signature: signature) + try: + return await vc.publishAggregateAndProofs(@[signedProof]): + except ValidatorApiError: + warn "Unable to publish aggregate and proofs" + return false + except CatchableError as exc: + error "Unexpected error happened", err_name = exc.name, + err_msg = exc.msg return false proc produceAndPublishAttestations*(service: AttestationServiceRef, slot: Slot, committee_index: CommitteeIndex, - duties: seq[RestAttesterDuty] + duties: seq[DutyAndProof] ): Future[AttestationData] {. async.} = doAssert(MAX_VALIDATORS_PER_COMMITTEE <= uint64(high(int))) @@ -71,12 +104,12 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef, block: var res: seq[Future[bool]] for duty in duties: - debug "Serving attestation duty", duty = duty, epoch = slot.epoch() - if (duty.slot != ad.slot) or - (uint64(duty.committee_index) != ad.index): + debug "Serving attestation duty", duty = duty.data, epoch = slot.epoch() + if (duty.data.slot != ad.slot) or + (uint64(duty.data.committee_index) != ad.index): error "Inconsistent validator duties during attestation signing", - validator = duty.pubkey, duty_slot = duty.slot, - duty_index = duty.committee_index, + validator = duty.data.pubkey, duty_slot = duty.data.slot, + duty_index = duty.data.committee_index, attestation_slot = ad.slot, attestation_index = ad.index continue res.add(service.serveAttestation(ad, duty)) @@ -113,7 +146,7 @@ proc produceAndPublishAttestations*(service: AttestationServiceRef, proc produceAndPublishAggregates(service: AttestationServiceRef, adata: AttestationData, - duties: seq[RestAttesterDuty]) {.async.} = + duties: seq[DutyAndProof]) {.async.} = let vc = service.client slot = adata.slot @@ -121,61 +154,84 @@ proc produceAndPublishAggregates(service: AttestationServiceRef, attestationRoot = adata.hash_tree_root() genesisRoot = vc.beaconGenesis.genesis_validators_root - let aggAttestation = - try: - await vc.getAggregatedAttestation(slot, attestationRoot) - except ValidatorApiError: - error "Unable to retrieve aggregated attestation data" - return - - let aggregateAndProofs = + let aggregateItems = block: - var res: seq[SignedAggregateAndProof] + var res: seq[AggregateItem] for duty in duties: - let validator = vc.attachedValidators.getValidator(duty.pubkey) - let slotSignature = await getSlotSig(validator, vc.fork.get(), - genesisRoot, slot) - if (duty.slot != slot) or (duty.committee_index != committeeIndex): - error "Inconsistent validator duties during aggregate signing", - duty_slot = duty.slot, slot = slot, - duty_committee_index = duty.committee_index, - committee_index = committeeIndex - continue - - if is_aggregator(duty.committee_length, slotSignature): - notice "Aggregating", slot = slot, validator = duty.pubkey - - let aggAndProof = AggregateAndProof( - aggregator_index: uint64(duty.validator_index), - aggregate: aggAttestation, - selection_proof: slot_signature - ) - let signature = await signAggregateAndProof(validator, aggAndProof, - vc.fork.get(), - genesisRoot) - res.add(SignedAggregateAndProof(message: aggAndProof, - signature: signature)) + let validator = vc.attachedValidators.getValidator(duty.data.pubkey) + if not(isNil(validator)): + if (duty.data.slot != slot) or + (duty.data.committee_index != committeeIndex): + error "Inconsistent validator duties during aggregate signing", + duty_slot = duty.data.slot, slot = slot, + duty_committee_index = duty.data.committee_index, + committee_index = committeeIndex + continue + if duty.slotSig.isSome(): + let slotSignature = duty.slotSig.get() + if is_aggregator(duty.data.committee_length, slotSignature): + res.add(AggregateItem( + aggregator_index: uint64(duty.data.validator_index), + selection_proof: slotSignature, + validator: validator + )) res - let count = len(aggregateAndProofs) - if count > 0: - let res = + if len(aggregateItems) > 0: + let aggAttestation = try: - await vc.publishAggregateAndProofs(aggregateAndProofs) + await vc.getAggregatedAttestation(slot, attestationRoot) except ValidatorApiError: - warn "Unable to publish aggregate and proofs" + error "Unable to retrieve aggregated attestation data" return - if res: - notice "Successfully published aggregate and proofs", count = count - else: - warn "Aggregate and proofs not accepted by beacon node", count = count + + let pendingAggregates = + block: + var res: seq[Future[bool]] + for item in aggregateItems: + let proof = AggregateAndProof( + aggregator_index: item.aggregator_index, + aggregate: aggAttestation, + selection_proof: item.selection_proof + ) + res.add(service.serveAggregateAndProof(proof, item.validator)) + res + + let statistics = + block: + var errored, succeed, failed = 0 + try: + await allFutures(pendingAggregates) + except CancelledError: + for fut in pendingAggregates: + if not(fut.finished()): + fut.cancel() + await allFutures(pendingAggregates) + + for future in pendingAggregates: + if future.done(): + if future.read(): + inc(succeed) + else: + inc(failed) + else: + inc(errored) + (succeed, errored, failed) + + let delay = vc.getDelay(seconds((int64(SECONDS_PER_SLOT) div 3) * 2)) + debug "Aggregate attestation statistics", total = len(pendingAggregates), + succeed = statistics[0], failed_to_deliver = statistics[1], + not_accepted = statistics[2], delay = delay, slot = slot, + committee_index = committeeIndex + else: - warn "No aggregate and proofs produced" + notice "No aggregate and proofs scheduled for slot", slot = slot, + committee_index = committeeIndex proc publishAttestationsAndAggregates(service: AttestationServiceRef, slot: Slot, committee_index: CommitteeIndex, - duties: seq[RestAttesterDuty]) {.async.} = + duties: seq[DutyAndProof]) {.async.} = let vc = service.client let aggregateTime = # chronos.Duration substraction could not return negative value, in such @@ -217,11 +273,11 @@ proc spawnAttestationTasks(service: AttestationServiceRef, let vc = service.client let dutiesByCommittee = block: - var res: Table[CommitteeIndex, seq[RestAttesterDuty]] + var res: Table[CommitteeIndex, seq[DutyAndProof]] let attesters = vc.getAttesterDutiesForSlot(slot) - var default: seq[RestAttesterDuty] + var default: seq[DutyAndProof] for item in attesters: - res.mgetOrPut(item.committee_index, default).add(item) + res.mgetOrPut(item.data.committee_index, default).add(item) res for index, duties in dutiesByCommittee.pairs(): if len(duties) > 0: diff --git a/beacon_chain/validator_client/common.nim b/beacon_chain/validator_client/common.nim index 5c27679b4..8c0e2d31d 100644 --- a/beacon_chain/validator_client/common.nim +++ b/beacon_chain/validator_client/common.nim @@ -178,15 +178,15 @@ proc getCurrentSlot*(vc: ValidatorClientRef): Option[Slot] = some(wallSlot.slot) proc getAttesterDutiesForSlot*(vc: ValidatorClientRef, - slot: Slot): seq[RestAttesterDuty] = + slot: Slot): seq[DutyAndProof] = ## Returns all `DutyAndPrrof` for the given `slot`. - var res: seq[RestAttesterDuty] + var res: seq[DutyAndProof] let epoch = slot.epoch() for key, item in vc.attesters.pairs(): let duty = item.duties.getOrDefault(epoch, DefaultDutyAndProof) if not(duty.isDefault()): if duty.data.slot == slot: - res.add(duty.data) + res.add(duty) res proc getDurationToNextAttestation*(vc: ValidatorClientRef,