initial implementation of attestation aggregation for the validator API

This commit is contained in:
Viktor Kirilov 2020-09-14 14:13:30 +03:00
parent 4ded755d32
commit e926db22b9
9 changed files with 109 additions and 24 deletions

View File

@ -22,13 +22,17 @@ const
MAXIMUM_GOSSIP_CLOCK_DISPARITY = 500.millis
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#aggregation-selection
func is_aggregator*(committee_len: uint64, slot_signature: ValidatorSig): bool =
let
modulo = max(1'u64, committee_len div TARGET_AGGREGATORS_PER_COMMITTEE)
bytes_to_uint64(eth2digest(
slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0
func is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex,
slot_signature: ValidatorSig, cache: var StateCache): bool =
let
committee_len = get_beacon_committee_len(state, slot, index, cache)
modulo = max(1'u64, committee_len div TARGET_AGGREGATORS_PER_COMMITTEE)
bytes_to_uint64(eth2digest(
slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0
return is_aggregator(committee_len, slot_signature)
proc aggregate_attestations*(
pool: AttestationPool, state: BeaconState, index: CommitteeIndex,

View File

@ -14,6 +14,7 @@ import
chronicles, stew/[byteutils], json_serialization/std/sets as jsonSets,
# Internal
./spec/[beaconstate, datatypes, crypto, digest, helpers],
ssz/merkleization,
./block_pools/[spec_cache, chain_dag, clearance], ./beacon_node_types,
./fork_choice/fork_choice
@ -102,6 +103,29 @@ proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) =
pool.startingSlot = newWallSlot
# now also clear old aggregated attestations
var keysToRemove: seq[Slot] = @[]
for k, v in pool.attestationAggregates.pairs:
if k < pool.startingSlot:
keysToRemove.add k
for k in keysToRemove:
pool.attestationAggregates.del k
proc addToAggregates(pool: var AttestationPool, attestation: Attestation) =
# do a lookup for the current slot and get it's associated htrs/attestations
var aggreated_attestation = pool.attestationAggregates.mgetOrPut(
attestation.data.slot, Table[Eth2Digest, Attestation]()).
# do a lookup for the same attestation data htr and get the attestation
mgetOrPut(attestation.data.hash_tree_root, attestation)
# if the aggregation bits differ (we didn't just insert it into the table)
# and only if there is no overlap of the signatures ==> aggregate!
if not aggreated_attestation.aggregation_bits.overlaps(attestation.aggregation_bits):
var agg {.noInit.}: AggregateSignature
agg.init(aggreated_attestation.signature)
aggreated_attestation.aggregation_bits.combine(attestation.aggregation_bits)
agg.aggregate(attestation.signature)
aggreated_attestation.signature = agg.finish()
proc addAttestation*(pool: var AttestationPool,
attestation: Attestation,
participants: HashSet[ValidatorIndex],
@ -126,6 +150,8 @@ proc addAttestation*(pool: var AttestationPool,
startingSlot = pool.startingSlot
return
pool.addToAggregates(attestation)
let
attestationsSeen = addr pool.candidates[candidateIdx.get]
validation = Validation(
@ -274,6 +300,17 @@ proc getAttestationsForBlock*(pool: AttestationPool,
attestationSlot = newBlockSlot - 1
return
proc getAggregatedAttestation*(pool: AttestationPool,
slot: Slot,
ad_htr: Eth2Digest): Option[Attestation] =
try:
if pool.attestationAggregates.contains(slot) and
pool.attestationAggregates[slot].contains(ad_htr):
return some pool.attestationAggregates[slot][ad_htr]
except KeyError:
doAssert(false) # shouldn't be possible because we check with `contains`
return none(Attestation)
proc getAggregatedAttestation*(pool: AttestationPool,
slot: Slot,
index: CommitteeIndex): Option[Attestation] =

View File

@ -3,7 +3,7 @@
import
std/[deques, tables, streams],
stew/endians2,
spec/[datatypes, crypto],
spec/[datatypes, digest, crypto],
block_pools/block_pools_types,
fork_choice/fork_choice_types,
validator_slashing_protection
@ -52,6 +52,10 @@ type
## "free" attestations with those found in past blocks - these votes
## are tracked separately in the fork choice.
attestationAggregates*: Table[Slot, Table[Eth2Digest, Attestation]]
## An up-to-date aggregate of each (htr-ed) attestation_data we see for
## each slot. We keep aggregates up to 32 slots back from the current slot.
candidates*: array[ATTESTATION_LOOKBACK, AttestationsSeen] ## \
## We keep one item per slot such that indexing matches slot number
## together with startingSlot

View File

@ -33,8 +33,6 @@ type
VCStartUpCmd* = enum
VCNoCommand
Web3Url* = distinct string
BeaconNodeConf* = object
logLevel* {.
defaultValue: "DEBUG"

View File

@ -48,6 +48,7 @@ template genFromJsonForIntType(t: untyped) =
genFromJsonForIntType(Epoch)
genFromJsonForIntType(Slot)
genFromJsonForIntType(CommitteeIndex)
genFromJsonForIntType(ValidatorIndex)
template `%`*(value: GraffitiBytes): JsonNode =
%($value)
@ -58,3 +59,6 @@ proc fromJson*(n: JsonNode, argName: string, value: var GraffitiBytes) =
proc `%`*(value: CommitteeIndex): JsonNode =
result = newJInt(value.int)
proc `%`*(value: ValidatorIndex): JsonNode =
result = newJInt(value.int)

View File

@ -10,6 +10,7 @@ import
type
AttesterDuties* = tuple
public_key: ValidatorPubKey
validator_index: ValidatorIndex
committee_index: CommitteeIndex
committee_length: uint64
validator_committee_index: uint64

View File

@ -16,16 +16,12 @@ proc post_v1_validator_block(body: SignedBeaconBlock): bool
proc get_v1_validator_attestation(slot: Slot, committee_index: CommitteeIndex): AttestationData
# TODO at the time of writing (10.06.2020) the API specifies this call to have a hash of
# the attestation data instead of the object itself but we also need the slot.. see here:
# https://docs.google.com/spreadsheets/d/1kVIx6GvzVLwNYbcd-Fj8YUlPf4qGrWUlS35uaTnIAVg/edit?disco=AAAAGh7r_fQ
proc get_v1_validator_aggregate_and_proof(attestation_data: AttestationData): Attestation
proc get_v1_validator_aggregate_attestation(slot: Slot, attestation_data_root: Eth2Digest): Attestation
proc post_v1_validator_aggregate_and_proof(payload: SignedAggregateAndProof): bool
proc post_v1_validator_aggregate_and_proofs(payload: SignedAggregateAndProof): bool
# this is a POST instead of a GET because of this: https://docs.google.com/spreadsheets/d/1kVIx6GvzVLwNYbcd-Fj8YUlPf4qGrWUlS35uaTnIAVg/edit?disco=AAAAJk5rbKA
# TODO epoch is part of the REST path
proc post_v1_validator_duties_attester(epoch: Epoch, public_keys: seq[ValidatorPubKey]): seq[AttesterDuties]
proc get_v1_validator_duties_attester(epoch: Epoch, public_keys: seq[ValidatorPubKey]): seq[AttesterDuties]
# TODO epoch is part of the REST path
proc get_v1_validator_duties_proposer(epoch: Epoch): seq[ValidatorPubkeySlotPair]

View File

@ -17,7 +17,7 @@ import
# Local modules
spec/[datatypes, digest, crypto, validator, helpers],
block_pools/[chain_dag, spec_cache], ssz/merkleization,
beacon_node_common, beacon_node_types,
beacon_node_common, beacon_node_types, attestation_pool,
validator_duties, eth2_network,
spec/eth2_apis/callsigs_types,
eth2_json_rpc_serialization
@ -179,6 +179,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
let GENESIS_FORK_VERSION = node.config.runtimePreset.GENESIS_FORK_VERSION
template withStateForStateId(stateId: string, body: untyped): untyped =
# TODO this can be optimized for the "head" case since that should be most common
node.chainDag.withState(node.chainDag.tmpState,
node.stateIdToBlockSlot(stateId)):
body
@ -335,19 +336,24 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
epochRef = node.chainDag.getEpochRef(head, slot.epoch)
return makeAttestationData(epochRef, head.atSlot(slot), committee_index.uint64)
rpcServer.rpc("get_v1_validator_aggregate_and_proof") do (
attestation_data: AttestationData)-> Attestation:
debug "get_v1_validator_aggregate_and_proof"
raise newException(CatchableError, "Not implemented")
rpcServer.rpc("get_v1_validator_aggregate_attestation") do (
slot: Slot, attestation_data_root: Eth2Digest)-> Attestation:
debug "get_v1_validator_aggregate_attestation"
let res = node.attestationPool[].getAggregatedAttestation(slot, attestation_data_root)
if res.isSome:
return res.get
raise newException(CatchableError, "Could not retrieve an aggregated attestation")
rpcServer.rpc("post_v1_validator_aggregate_and_proof") do (
rpcServer.rpc("post_v1_validator_aggregate_and_proofs") do (
payload: SignedAggregateAndProof) -> bool:
debug "post_v1_validator_aggregate_and_proof"
raise newException(CatchableError, "Not implemented")
debug "post_v1_validator_aggregate_and_proofs"
node.network.broadcast(node.topicAggregateAndProofs, payload)
info "Aggregated attestation sent",
attestation = shortLog(payload.message.aggregate)
rpcServer.rpc("post_v1_validator_duties_attester") do (
rpcServer.rpc("get_v1_validator_duties_attester") do (
epoch: Epoch, public_keys: seq[ValidatorPubKey]) -> seq[AttesterDuties]:
debug "post_v1_validator_duties_attester", epoch = epoch
debug "get_v1_validator_duties_attester", epoch = epoch
let
head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.chainDag.getEpochRef(head, epoch)
@ -362,6 +368,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
let curr_val_pubkey = epochRef.validator_keys[validatorIdx].initPubKey
if public_keys.findIt(it == curr_val_pubkey) != -1:
result.add((public_key: curr_val_pubkey,
validator_index: validatorIdx,
committee_index: committee_index.CommitteeIndex,
committee_length: committee.lenu64,
validator_committee_index: index_in_committee.uint64,

View File

@ -19,6 +19,7 @@ import
spec/[datatypes, digest, crypto, helpers, network, signatures],
conf, time, version,
eth2_network, eth2_discovery, validator_pool, beacon_node_types,
attestation_aggregation,
nimbus_binary_common,
version, ssz/merkleization,
sync_manager, keystore_management,
@ -75,7 +76,7 @@ proc getValidatorDutiesForEpoch(vc: ValidatorClient, epoch: Epoch) {.gcsafe, asy
# make sure there's an entry
if not vc.attestationsForEpoch.contains epoch:
vc.attestationsForEpoch.add(epoch, Table[Slot, seq[AttesterDuties]]())
let attestations = await vc.client.post_v1_validator_duties_attester(
let attestations = await vc.client.get_v1_validator_duties_attester(
epoch, validatorPubkeys)
for a in attestations:
if vc.attestationsForEpoch[epoch].hasKeyOrPut(a.slot, @[a]):
@ -176,6 +177,7 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a
# check if we have validators which need to attest on this slot
if vc.attestationsForEpoch.contains(epoch) and
vc.attestationsForEpoch[epoch].contains slot:
var validatorToAttestationDataRoot: Table[ValidatorPubKey, Eth2Digest]
for a in vc.attestationsForEpoch[epoch][slot]:
info "Attesting", slot = slot, public_key = a.public_key
@ -208,6 +210,38 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a
validator = a.public_key,
badVoteDetails = $notSlashable.error
validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#broadcast-aggregate
# If the validator is selected to aggregate (is_aggregator), then they
# broadcast their best aggregate as a SignedAggregateAndProof to the global
# aggregate channel (beacon_aggregate_and_proof) two-thirds of the way
# through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start
# of slot.
if slot > 2:
discard await vc.beaconClock.sleepToSlotOffset(
seconds(int64(SECONDS_PER_SLOT * 2)) div 3, slot,
"Waiting to aggregate attestations")
# loop again over all of our validators which need to attest on
# this slot and check if we should also aggregate attestations
for a in vc.attestationsForEpoch[epoch][slot]:
let validator = vc.attachedValidators.validators[a.public_key]
let slot_signature = await getSlotSig(validator, vc.fork,
vc.beaconGenesis.genesis_validators_root, slot)
if is_aggregator(a.committee_length, slot_signature):
info "Aggregating", slot = slot, public_key = a.public_key
let aa = await vc.client.get_v1_validator_aggregate_attestation(
slot, validatorToAttestationDataRoot[a.public_key])
let aap = AggregateAndProof(aggregator_index: a.validator_index.uint64,
aggregate: aa, selection_proof: slot_signature)
let sig = await signAggregateAndProof(validator,
aap, vc.fork, vc.beaconGenesis.genesis_validators_root)
var signedAP = SignedAggregateAndProof(message: aap, signature: sig)
discard await vc.client.post_v1_validator_aggregate_and_proofs(signedAP)
except CatchableError as err:
warn "Caught an unexpected error", err = err.msg, slot = shortLog(slot)