From e926db22b94b8202a67f89a7739e7e035f341283 Mon Sep 17 00:00:00 2001 From: Viktor Kirilov Date: Mon, 14 Sep 2020 14:13:30 +0300 Subject: [PATCH] initial implementation of attestation aggregation for the validator API --- beacon_chain/attestation_aggregation.nim | 10 +++-- beacon_chain/attestation_pool.nim | 37 +++++++++++++++++++ beacon_chain/beacon_node_types.nim | 6 ++- beacon_chain/conf.nim | 2 - beacon_chain/eth2_json_rpc_serialization.nim | 4 ++ .../spec/eth2_apis/callsigs_types.nim | 1 + .../spec/eth2_apis/validator_callsigs.nim | 10 ++--- beacon_chain/validator_api.nim | 27 +++++++++----- beacon_chain/validator_client.nim | 36 +++++++++++++++++- 9 files changed, 109 insertions(+), 24 deletions(-) diff --git a/beacon_chain/attestation_aggregation.nim b/beacon_chain/attestation_aggregation.nim index 8c48fa77e..5d9877914 100644 --- a/beacon_chain/attestation_aggregation.nim +++ b/beacon_chain/attestation_aggregation.nim @@ -22,13 +22,17 @@ const MAXIMUM_GOSSIP_CLOCK_DISPARITY = 500.millis # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#aggregation-selection +func is_aggregator*(committee_len: uint64, slot_signature: ValidatorSig): bool = + let + modulo = max(1'u64, committee_len div TARGET_AGGREGATORS_PER_COMMITTEE) + bytes_to_uint64(eth2digest( + slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0 + func is_aggregator(state: BeaconState, slot: Slot, index: CommitteeIndex, slot_signature: ValidatorSig, cache: var StateCache): bool = let committee_len = get_beacon_committee_len(state, slot, index, cache) - modulo = max(1'u64, committee_len div TARGET_AGGREGATORS_PER_COMMITTEE) - bytes_to_uint64(eth2digest( - slot_signature.toRaw()).data.toOpenArray(0, 7)) mod modulo == 0 + return is_aggregator(committee_len, slot_signature) proc aggregate_attestations*( pool: AttestationPool, state: BeaconState, index: CommitteeIndex, diff --git a/beacon_chain/attestation_pool.nim b/beacon_chain/attestation_pool.nim index e4fb29859..b5d9af02c 100644 --- a/beacon_chain/attestation_pool.nim +++ b/beacon_chain/attestation_pool.nim @@ -14,6 +14,7 @@ import chronicles, stew/[byteutils], json_serialization/std/sets as jsonSets, # Internal ./spec/[beaconstate, datatypes, crypto, digest, helpers], + ssz/merkleization, ./block_pools/[spec_cache, chain_dag, clearance], ./beacon_node_types, ./fork_choice/fork_choice @@ -102,6 +103,29 @@ proc updateCurrent(pool: var AttestationPool, wallSlot: Slot) = pool.startingSlot = newWallSlot + # now also clear old aggregated attestations + var keysToRemove: seq[Slot] = @[] + for k, v in pool.attestationAggregates.pairs: + if k < pool.startingSlot: + keysToRemove.add k + for k in keysToRemove: + pool.attestationAggregates.del k + +proc addToAggregates(pool: var AttestationPool, attestation: Attestation) = + # do a lookup for the current slot and get it's associated htrs/attestations + var aggreated_attestation = pool.attestationAggregates.mgetOrPut( + attestation.data.slot, Table[Eth2Digest, Attestation]()). + # do a lookup for the same attestation data htr and get the attestation + mgetOrPut(attestation.data.hash_tree_root, attestation) + # if the aggregation bits differ (we didn't just insert it into the table) + # and only if there is no overlap of the signatures ==> aggregate! + if not aggreated_attestation.aggregation_bits.overlaps(attestation.aggregation_bits): + var agg {.noInit.}: AggregateSignature + agg.init(aggreated_attestation.signature) + aggreated_attestation.aggregation_bits.combine(attestation.aggregation_bits) + agg.aggregate(attestation.signature) + aggreated_attestation.signature = agg.finish() + proc addAttestation*(pool: var AttestationPool, attestation: Attestation, participants: HashSet[ValidatorIndex], @@ -126,6 +150,8 @@ proc addAttestation*(pool: var AttestationPool, startingSlot = pool.startingSlot return + pool.addToAggregates(attestation) + let attestationsSeen = addr pool.candidates[candidateIdx.get] validation = Validation( @@ -274,6 +300,17 @@ proc getAttestationsForBlock*(pool: AttestationPool, attestationSlot = newBlockSlot - 1 return +proc getAggregatedAttestation*(pool: AttestationPool, + slot: Slot, + ad_htr: Eth2Digest): Option[Attestation] = + try: + if pool.attestationAggregates.contains(slot) and + pool.attestationAggregates[slot].contains(ad_htr): + return some pool.attestationAggregates[slot][ad_htr] + except KeyError: + doAssert(false) # shouldn't be possible because we check with `contains` + return none(Attestation) + proc getAggregatedAttestation*(pool: AttestationPool, slot: Slot, index: CommitteeIndex): Option[Attestation] = diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index 70a97fb4b..af45dc3e0 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -3,7 +3,7 @@ import std/[deques, tables, streams], stew/endians2, - spec/[datatypes, crypto], + spec/[datatypes, digest, crypto], block_pools/block_pools_types, fork_choice/fork_choice_types, validator_slashing_protection @@ -52,6 +52,10 @@ type ## "free" attestations with those found in past blocks - these votes ## are tracked separately in the fork choice. + attestationAggregates*: Table[Slot, Table[Eth2Digest, Attestation]] + ## An up-to-date aggregate of each (htr-ed) attestation_data we see for + ## each slot. We keep aggregates up to 32 slots back from the current slot. + candidates*: array[ATTESTATION_LOOKBACK, AttestationsSeen] ## \ ## We keep one item per slot such that indexing matches slot number ## together with startingSlot diff --git a/beacon_chain/conf.nim b/beacon_chain/conf.nim index 26a0c2330..75861dd43 100644 --- a/beacon_chain/conf.nim +++ b/beacon_chain/conf.nim @@ -33,8 +33,6 @@ type VCStartUpCmd* = enum VCNoCommand - Web3Url* = distinct string - BeaconNodeConf* = object logLevel* {. defaultValue: "DEBUG" diff --git a/beacon_chain/eth2_json_rpc_serialization.nim b/beacon_chain/eth2_json_rpc_serialization.nim index 7a128c76a..5b7b267ad 100644 --- a/beacon_chain/eth2_json_rpc_serialization.nim +++ b/beacon_chain/eth2_json_rpc_serialization.nim @@ -48,6 +48,7 @@ template genFromJsonForIntType(t: untyped) = genFromJsonForIntType(Epoch) genFromJsonForIntType(Slot) genFromJsonForIntType(CommitteeIndex) +genFromJsonForIntType(ValidatorIndex) template `%`*(value: GraffitiBytes): JsonNode = %($value) @@ -58,3 +59,6 @@ proc fromJson*(n: JsonNode, argName: string, value: var GraffitiBytes) = proc `%`*(value: CommitteeIndex): JsonNode = result = newJInt(value.int) + +proc `%`*(value: ValidatorIndex): JsonNode = + result = newJInt(value.int) diff --git a/beacon_chain/spec/eth2_apis/callsigs_types.nim b/beacon_chain/spec/eth2_apis/callsigs_types.nim index 9c1ed916d..ff7198016 100644 --- a/beacon_chain/spec/eth2_apis/callsigs_types.nim +++ b/beacon_chain/spec/eth2_apis/callsigs_types.nim @@ -10,6 +10,7 @@ import type AttesterDuties* = tuple public_key: ValidatorPubKey + validator_index: ValidatorIndex committee_index: CommitteeIndex committee_length: uint64 validator_committee_index: uint64 diff --git a/beacon_chain/spec/eth2_apis/validator_callsigs.nim b/beacon_chain/spec/eth2_apis/validator_callsigs.nim index 2fdf11032..d4085a4fb 100644 --- a/beacon_chain/spec/eth2_apis/validator_callsigs.nim +++ b/beacon_chain/spec/eth2_apis/validator_callsigs.nim @@ -16,16 +16,12 @@ proc post_v1_validator_block(body: SignedBeaconBlock): bool proc get_v1_validator_attestation(slot: Slot, committee_index: CommitteeIndex): AttestationData -# TODO at the time of writing (10.06.2020) the API specifies this call to have a hash of -# the attestation data instead of the object itself but we also need the slot.. see here: -# https://docs.google.com/spreadsheets/d/1kVIx6GvzVLwNYbcd-Fj8YUlPf4qGrWUlS35uaTnIAVg/edit?disco=AAAAGh7r_fQ -proc get_v1_validator_aggregate_and_proof(attestation_data: AttestationData): Attestation +proc get_v1_validator_aggregate_attestation(slot: Slot, attestation_data_root: Eth2Digest): Attestation -proc post_v1_validator_aggregate_and_proof(payload: SignedAggregateAndProof): bool +proc post_v1_validator_aggregate_and_proofs(payload: SignedAggregateAndProof): bool -# this is a POST instead of a GET because of this: https://docs.google.com/spreadsheets/d/1kVIx6GvzVLwNYbcd-Fj8YUlPf4qGrWUlS35uaTnIAVg/edit?disco=AAAAJk5rbKA # TODO epoch is part of the REST path -proc post_v1_validator_duties_attester(epoch: Epoch, public_keys: seq[ValidatorPubKey]): seq[AttesterDuties] +proc get_v1_validator_duties_attester(epoch: Epoch, public_keys: seq[ValidatorPubKey]): seq[AttesterDuties] # TODO epoch is part of the REST path proc get_v1_validator_duties_proposer(epoch: Epoch): seq[ValidatorPubkeySlotPair] diff --git a/beacon_chain/validator_api.nim b/beacon_chain/validator_api.nim index 3a31cf73a..4b52382be 100644 --- a/beacon_chain/validator_api.nim +++ b/beacon_chain/validator_api.nim @@ -17,7 +17,7 @@ import # Local modules spec/[datatypes, digest, crypto, validator, helpers], block_pools/[chain_dag, spec_cache], ssz/merkleization, - beacon_node_common, beacon_node_types, + beacon_node_common, beacon_node_types, attestation_pool, validator_duties, eth2_network, spec/eth2_apis/callsigs_types, eth2_json_rpc_serialization @@ -179,6 +179,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = let GENESIS_FORK_VERSION = node.config.runtimePreset.GENESIS_FORK_VERSION template withStateForStateId(stateId: string, body: untyped): untyped = + # TODO this can be optimized for the "head" case since that should be most common node.chainDag.withState(node.chainDag.tmpState, node.stateIdToBlockSlot(stateId)): body @@ -335,19 +336,24 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = epochRef = node.chainDag.getEpochRef(head, slot.epoch) return makeAttestationData(epochRef, head.atSlot(slot), committee_index.uint64) - rpcServer.rpc("get_v1_validator_aggregate_and_proof") do ( - attestation_data: AttestationData)-> Attestation: - debug "get_v1_validator_aggregate_and_proof" - raise newException(CatchableError, "Not implemented") + rpcServer.rpc("get_v1_validator_aggregate_attestation") do ( + slot: Slot, attestation_data_root: Eth2Digest)-> Attestation: + debug "get_v1_validator_aggregate_attestation" + let res = node.attestationPool[].getAggregatedAttestation(slot, attestation_data_root) + if res.isSome: + return res.get + raise newException(CatchableError, "Could not retrieve an aggregated attestation") - rpcServer.rpc("post_v1_validator_aggregate_and_proof") do ( + rpcServer.rpc("post_v1_validator_aggregate_and_proofs") do ( payload: SignedAggregateAndProof) -> bool: - debug "post_v1_validator_aggregate_and_proof" - raise newException(CatchableError, "Not implemented") + debug "post_v1_validator_aggregate_and_proofs" + node.network.broadcast(node.topicAggregateAndProofs, payload) + info "Aggregated attestation sent", + attestation = shortLog(payload.message.aggregate) - rpcServer.rpc("post_v1_validator_duties_attester") do ( + rpcServer.rpc("get_v1_validator_duties_attester") do ( epoch: Epoch, public_keys: seq[ValidatorPubKey]) -> seq[AttesterDuties]: - debug "post_v1_validator_duties_attester", epoch = epoch + debug "get_v1_validator_duties_attester", epoch = epoch let head = node.doChecksAndGetCurrentHead(epoch) epochRef = node.chainDag.getEpochRef(head, epoch) @@ -362,6 +368,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = let curr_val_pubkey = epochRef.validator_keys[validatorIdx].initPubKey if public_keys.findIt(it == curr_val_pubkey) != -1: result.add((public_key: curr_val_pubkey, + validator_index: validatorIdx, committee_index: committee_index.CommitteeIndex, committee_length: committee.lenu64, validator_committee_index: index_in_committee.uint64, diff --git a/beacon_chain/validator_client.nim b/beacon_chain/validator_client.nim index 3ede71bc9..928b100a0 100644 --- a/beacon_chain/validator_client.nim +++ b/beacon_chain/validator_client.nim @@ -19,6 +19,7 @@ import spec/[datatypes, digest, crypto, helpers, network, signatures], conf, time, version, eth2_network, eth2_discovery, validator_pool, beacon_node_types, + attestation_aggregation, nimbus_binary_common, version, ssz/merkleization, sync_manager, keystore_management, @@ -75,7 +76,7 @@ proc getValidatorDutiesForEpoch(vc: ValidatorClient, epoch: Epoch) {.gcsafe, asy # make sure there's an entry if not vc.attestationsForEpoch.contains epoch: vc.attestationsForEpoch.add(epoch, Table[Slot, seq[AttesterDuties]]()) - let attestations = await vc.client.post_v1_validator_duties_attester( + let attestations = await vc.client.get_v1_validator_duties_attester( epoch, validatorPubkeys) for a in attestations: if vc.attestationsForEpoch[epoch].hasKeyOrPut(a.slot, @[a]): @@ -176,6 +177,7 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a # check if we have validators which need to attest on this slot if vc.attestationsForEpoch.contains(epoch) and vc.attestationsForEpoch[epoch].contains slot: + var validatorToAttestationDataRoot: Table[ValidatorPubKey, Eth2Digest] for a in vc.attestationsForEpoch[epoch][slot]: info "Attesting", slot = slot, public_key = a.public_key @@ -208,6 +210,38 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a validator = a.public_key, badVoteDetails = $notSlashable.error + validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root + + # https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/validator.md#broadcast-aggregate + # If the validator is selected to aggregate (is_aggregator), then they + # broadcast their best aggregate as a SignedAggregateAndProof to the global + # aggregate channel (beacon_aggregate_and_proof) two-thirds of the way + # through the slot-that is, SECONDS_PER_SLOT * 2 / 3 seconds after the start + # of slot. + if slot > 2: + discard await vc.beaconClock.sleepToSlotOffset( + seconds(int64(SECONDS_PER_SLOT * 2)) div 3, slot, + "Waiting to aggregate attestations") + + # loop again over all of our validators which need to attest on + # this slot and check if we should also aggregate attestations + for a in vc.attestationsForEpoch[epoch][slot]: + let validator = vc.attachedValidators.validators[a.public_key] + let slot_signature = await getSlotSig(validator, vc.fork, + vc.beaconGenesis.genesis_validators_root, slot) + + if is_aggregator(a.committee_length, slot_signature): + info "Aggregating", slot = slot, public_key = a.public_key + + let aa = await vc.client.get_v1_validator_aggregate_attestation( + slot, validatorToAttestationDataRoot[a.public_key]) + let aap = AggregateAndProof(aggregator_index: a.validator_index.uint64, + aggregate: aa, selection_proof: slot_signature) + let sig = await signAggregateAndProof(validator, + aap, vc.fork, vc.beaconGenesis.genesis_validators_root) + var signedAP = SignedAggregateAndProof(message: aap, signature: sig) + discard await vc.client.post_v1_validator_aggregate_and_proofs(signedAP) + except CatchableError as err: warn "Caught an unexpected error", err = err.msg, slot = shortLog(slot)