From 867d8f32233f58a533f6d5d5d79e925475ca91b3 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 10 May 2021 09:13:36 +0200 Subject: [PATCH] Perform attestation check before broadcast (#2550) Currently, we have a bit of a convoluted flow where when sending attestations, we start broadcasting them over gossip then pass them to the attestation validation to include them in the local attestation pool - it should be the other way around: we should be checking attestations _before_ gossipping them - this serves as an additional safety net to ensure that we don't publish junk - this becomes more important when publishing attestations from the API. Also, the REST API was performing its own validation meaning attestations coming from REST would be validated twice - finally, the JSON RPC wasn't pre-validating and would happily broadcast invalid attestations. * Unified attestation production pipeline with the same flow for gossip, locally and API-produced attestations: all are now validated and entered into the pool, then broadcast/republished * Refactor subnet handling with specific SubnetId alias, streamlining where subnets are computed, avoiding the need to pass around the number of active validators * Move some of the subnet handling code to eth2_network * Use BitArray throughout for subnet handling --- AllTests-mainnet.md | 7 +- beacon_chain/beacon_node_types.nim | 4 +- .../consensus_object_pools/spec_cache.nim | 8 +- .../gossip_processing/eth2_processor.nim | 9 +- .../gossip_processing/gossip_validation.nim | 67 +++--- beacon_chain/networking/eth2_network.nim | 49 ++++- beacon_chain/nimbus_beacon_node.nim | 194 +++++++----------- beacon_chain/nimbus_validator_client.nim | 9 +- beacon_chain/rpc/beacon_api.nim | 3 +- beacon_chain/rpc/beacon_rest_api.nim | 12 +- beacon_chain/rpc/validator_api.nim | 14 +- beacon_chain/spec/datatypes/base.nim | 20 ++ beacon_chain/spec/network.nim | 13 +- beacon_chain/ssz/bitseqs.nim | 26 +++ beacon_chain/validators/validator_duties.nim | 158 +++++++------- beacon_chain/validators/validator_pool.nim | 14 +- ncli/inspector.nim | 2 +- tests/test_bitseqs.nim | 40 +++- tests/test_gossip_validation.nim | 7 +- tests/test_honest_validator.nim | 40 ++-- 20 files changed, 398 insertions(+), 298 deletions(-) diff --git a/AllTests-mainnet.md b/AllTests-mainnet.md index bf687235b..7813d8ef3 100644 --- a/AllTests-mainnet.md +++ b/AllTests-mainnet.md @@ -101,9 +101,10 @@ OK: 1/1 Fail: 0/1 Skip: 0/1 ```diff + iterating words OK + overlaps OK -+ roundtrips OK ++ roundtrips BitArray OK ++ roundtrips BitSeq OK ``` -OK: 3/3 Fail: 0/3 Skip: 0/3 +OK: 4/4 Fail: 0/4 Skip: 0/4 ## Block pool processing [Preset: mainnet] ```diff + Adding the same block twice returns a Duplicate error [Preset: mainnet] OK @@ -318,4 +319,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2 OK: 1/1 Fail: 0/1 Skip: 0/1 ---TOTAL--- -OK: 175/184 Fail: 0/184 Skip: 9/184 +OK: 176/185 Fail: 0/185 Skip: 9/185 diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index 5e93c0562..dc506de4b 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -135,11 +135,11 @@ type AttestationSubnets* = object enabled*: bool - stabilitySubnets*: seq[tuple[subnet: uint8, expiration: Epoch]] + stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]] nextCycleEpoch*: Epoch # These encode states in per-subnet state machines - subscribedSubnets*: set[uint8] + subscribedSubnets*: BitArray[ATTESTATION_SUBNET_COUNT] subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] diff --git a/beacon_chain/consensus_object_pools/spec_cache.nim b/beacon_chain/consensus_object_pools/spec_cache.nim index 52c90a0d3..9660d93ac 100644 --- a/beacon_chain/consensus_object_pools/spec_cache.nim +++ b/beacon_chain/consensus_object_pools/spec_cache.nim @@ -24,6 +24,10 @@ func count_active_validators*(epochInfo: EpochRef): uint64 = func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 = get_committee_count_per_slot(count_active_validators(epochInfo)) +iterator get_committee_indices*(epochRef: EpochRef): CommitteeIndex = + for i in 0'u64.. 0: yield ( includedIndices, idx, - compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8, + compute_subnet_for_attestation(committees_per_slot, slot, idx), slot) diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 5f62e992b..6422957a2 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -187,11 +187,11 @@ proc checkForPotentialDoppelganger( proc attestationValidator*( self: ref Eth2Processor, attestation: Attestation, - attestation_subnet: uint64, - checksExpensive: bool = true): Future[ValidationResult] {.async.} = + subnet_id: SubnetId, + checkSignature: bool = true): Future[ValidationResult] {.async.} = logScope: attestation = shortLog(attestation) - attestation_subnet + subnet_id let wallTime = self.getWallTime() @@ -209,8 +209,7 @@ proc attestationValidator*( # Now proceed to validation let v = await self.attestationPool.validateAttestation( - self.batchCrypto, - attestation, wallTime, attestation_subnet, checksExpensive) + self.batchCrypto, attestation, wallTime, subnet_id, checkSignature) if v.isErr(): debug "Dropping attestation", err = v.error() return v.error[0] diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 4d334d4a5..1071413d4 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -140,14 +140,14 @@ func check_aggregation_count( func check_attestation_subnet( epochRef: EpochRef, attestation: Attestation, - attestation_subnet: uint64): Result[void, (ValidationResult, cstring)] = + subnet_id: SubnetId): Result[void, (ValidationResult, cstring)] = let expectedSubnet = compute_subnet_for_attestation( get_committee_count_per_slot(epochRef), attestation.data.slot, attestation.data.index.CommitteeIndex) - if expectedSubnet != attestation_subnet: + if expectedSubnet != subnet_id: return err((ValidationResult.Reject, cstring( "Attestation not on the correct subnet"))) @@ -162,7 +162,7 @@ proc validateAttestation*( batchCrypto: ref BatchCrypto, attestation: Attestation, wallTime: BeaconTime, - attestation_subnet: uint64, checksExpensive: bool): + subnet_id: SubnetId, checkSignature: bool): Future[Result[tuple[attesting_index: ValidatorIndex, sig: CookedSig], (ValidationResult, cstring)]] {.async.} = # Some of the checks below have been reordered compared to the spec, to @@ -232,7 +232,7 @@ proc validateAttestation*( # attestation.data.target.epoch), which may be pre-computed along with the # committee information for the signature check. block: - let v = check_attestation_subnet(epochRef, attestation, attestation_subnet) # [REJECT] + let v = check_attestation_subnet(epochRef, attestation, subnet_id) # [REJECT] if v.isErr(): return err(v.error) @@ -271,14 +271,6 @@ proc validateAttestation*( return err((ValidationResult.Ignore, cstring( "Validator has already voted in epoch"))) - if not checksExpensive: - # Only sendAttestation, which discards result, doesn't use checksExpensive - # TODO this means that (a) this becomes an "expensive" check and (b) it is - # doing in-principle unnecessary work, since this should be known from the - # attestation creation. - return ok((validator_index, attestation.signature.load.get().CookedSig)) - - # The signature of attestation is valid. block: # First pass - without cryptography let v = is_valid_indexed_attestation( @@ -287,29 +279,38 @@ proc validateAttestation*( if v.isErr(): return err((ValidationResult.Reject, v.error)) - # Buffer crypto checks - let deferredCrypto = batchCrypto - .scheduleAttestationCheck( - fork, genesis_validators_root, epochRef, - attestation - ) - if deferredCrypto.isNone(): - return err((ValidationResult.Reject, - cstring("validateAttestation: crypto sanity checks failure"))) + let sig = + if checkSignature: + # Attestation signatures are batch-verified + let deferredCrypto = batchCrypto + .scheduleAttestationCheck( + fork, genesis_validators_root, epochRef, + attestation + ) + if deferredCrypto.isNone(): + return err((ValidationResult.Reject, + cstring("validateAttestation: crypto sanity checks failure"))) - # Await the crypto check - let - (cryptoFut, sig) = deferredCrypto.get() + # Await the crypto check + let + (cryptoFut, sig) = deferredCrypto.get() - var x = (await cryptoFut) - case x - of BatchResult.Invalid: - return err((ValidationResult.Reject, cstring("validateAttestation: invalid signature"))) - of BatchResult.Timeout: - beacon_attestations_dropped_queue_full.inc() - return err((ValidationResult.Ignore, cstring("validateAttestation: timeout checking signature"))) - of BatchResult.Valid: - discard # keep going only in this case + var x = (await cryptoFut) + case x + of BatchResult.Invalid: + return err((ValidationResult.Reject, cstring("validateAttestation: invalid signature"))) + of BatchResult.Timeout: + beacon_attestations_dropped_queue_full.inc() + return err((ValidationResult.Ignore, cstring("validateAttestation: timeout checking signature"))) + of BatchResult.Valid: + sig # keep going only in this case + else: + let sig = attestation.signature.load() + if not sig.isSome(): + return err(( + ValidationResult.Ignore, + cstring("validateAttestation: unable to load signature"))) + sig.get() # Only valid attestations go in the list, which keeps validator_index # in range diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 5b24c1dfb..4849c83dc 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -31,7 +31,7 @@ import libp2p/stream/connection, libp2p/utils/semaphore, eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, - eth/net/nat, eth/p2p/discoveryv5/[enr, node], + eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[ version, conf, ssz/ssz_serialization, beacon_clock], @@ -1763,3 +1763,50 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) = traceMessage(futSnappy, gossipId(uncompressed, true)) except IOError as exc: raiseAssert exc.msg # TODO in-memory compression shouldn't fail + +proc subscribeAttestationSubnets*( + node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {. + raises: [Defect, CatchableError].} = + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation + # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable + + for subnet_id, enabled in subnets: + if enabled: + node.subscribe(getAttestationTopic( + node.forkID.fork_digest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now + +proc unsubscribeAttestationSubnets*( + node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {. + raises: [Defect, CatchableError].} = + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation + # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable + + for subnet_id, enabled in subnets: + if enabled: + node.unsubscribe(getAttestationTopic( + node.forkID.fork_digest, SubnetId(subnet_id))) + +proc updateStabilitySubnetMetadata*( + node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) = + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#metadata + node.metadata.seq_number += 1 + node.metadata.attnets = attnets + + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability + # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestation-subnet-bitfield + let res = node.discovery.updateRecord( + {"attnets": SSZ.encode(node.metadata.attnets)}) + if res.isErr(): + # This should not occur in this scenario as the private key would always + # be the correct one and the ENR will not increase in size. + warn "Failed to update record on subnet cycle", error = res.error + else: + debug "Stability subnets changed; updated ENR attnets", attnets + +# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability +func getStabilitySubnetLength*(node: Eth2Node): uint64 = + EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION + + node.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64 + +func getRandomSubnetId*(node: Eth2Node): SubnetId = + node.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).SubnetId diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 8e09f1e30..286653bfc 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -384,8 +384,8 @@ proc init*(T: type BeaconNode, getVoluntaryExitsTopic(enrForkId.fork_digest), getAggregateAndProofsTopic(enrForkId.fork_digest) ] - for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: - topics &= getAttestationTopic(enrForkId.fork_digest, subnet) + for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: + topics &= getAttestationTopic(enrForkId.fork_digest, SubnetId(subnet_id)) topics) if node.config.inProcessValidators: @@ -426,34 +426,9 @@ func verifyFinalization(node: BeaconNode, slot: Slot) = # finalization occurs every slot, to 4 slots vs scheduledSlot. doAssert finalizedEpoch + 4 >= epoch -proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) {. - raises: [Defect, CatchableError].} = - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation - # nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable - for subnet in subnets: - node.network.subscribe(getAttestationTopic(node.forkDigest, subnet), TopicParams.init()) # don't score attestation subnets for now - -proc updateStabilitySubnetMetadata( - node: BeaconNode, stabilitySubnets: set[uint8]) = - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#metadata - node.network.metadata.seq_number += 1 - for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - node.network.metadata.attnets[subnet] = (subnet in stabilitySubnets) - - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability - # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestation-subnet-bitfield - let res = node.network.discovery.updateRecord( - {"attnets": SSZ.encode(node.network.metadata.attnets)}) - if res.isErr(): - # This should not occur in this scenario as the private key would always - # be the correct one and the ENR will not increase in size. - warn "Failed to update record on subnet cycle", error = res.error - else: - debug "Stability subnets changed; updated ENR attnets", stabilitySubnets - -func getStabilitySubnets(stabilitySubnets: auto): set[uint8] = +func getStabilitySubnets(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] = for subnetInfo in stabilitySubnets: - result.incl subnetInfo.subnet + result[subnetInfo.subnet_id.int] = true proc getAttachedValidators(node: BeaconNode): Table[ValidatorIndex, AttachedValidator] = @@ -503,7 +478,7 @@ proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} = # The relevant bitmaps are 32 bits each. static: doAssert SLOTS_PER_EPOCH <= 32 - for (validatorIndices, committeeIndex, subnetIndex, slot) in + for (validatorIndices, committeeIndex, subnet_id, slot) in get_committee_assignments(epochRef, epoch, validatorIndices): doAssert compute_epoch_at_slot(slot) == epoch @@ -530,43 +505,36 @@ proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} = get_beacon_committee_len(epochRef, slot, committeeIndex), slot): continue - node.attestationSubnets.unsubscribeSlot[subnetIndex] = - max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex]) - if subnetIndex notin node.attestationSubnets.subscribedSubnets: + node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] = + max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnet_id.uint64]) + if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]: const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34 - node.attestationSubnets.subscribeSlot[subnetIndex] = + node.attestationSubnets.subscribeSlot[subnet_id.uint64] = # Queue upcoming subscription potentially earlier # SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch # cycling mechanism timing buffers min( slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS), - node.attestationSubnets.subscribeSlot[subnetIndex]) + node.attestationSubnets.subscribeSlot[subnet_id.uint64]) -# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability -func getStabilitySubnetLength(node: BeaconNode): uint64 = - EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION + - node.network.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64 - -func updateStabilitySubnets(node: BeaconNode, slot: Slot): set[uint8] = +func updateStabilitySubnets(node: BeaconNode, slot: Slot): BitArray[ATTESTATION_SUBNET_COUNT] = # Equivalent to wallSlot by cycleAttestationSubnets(), especially # since it'll try to run early in epochs, avoiding race conditions. - static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8) let epoch = slot.epoch # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability - for i in 0 ..< node.attestationSubnets.stabilitySubnets.len: - if epoch >= node.attestationSubnets.stabilitySubnets[i].expiration: - node.attestationSubnets.stabilitySubnets[i].subnet = - node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8 - node.attestationSubnets.stabilitySubnets[i].expiration = - epoch + node.getStabilitySubnetLength() + for ss in node.attestationSubnets.stabilitySubnets.mitems(): + if epoch >= ss.expiration: + ss.subnet_id = node.network.getRandomSubnetId() + ss.expiration = epoch + node.network.getStabilitySubnetLength() - result.incl node.attestationSubnets.stabilitySubnets[i].subnet + result[ss.subnet_id.int] = true proc cycleAttestationSubnetsPerEpoch( - node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]): - Future[set[uint8]] {.async.} = + node: BeaconNode, wallSlot: Slot, + prevStabilitySubnets: BitArray[ATTESTATION_SUBNET_COUNT]): + Future[BitArray[ATTESTATION_SUBNET_COUNT]] {.async.} = # Per-epoch portion of subnet cycling: updating stability subnets and # calculating future attestation subnets. @@ -595,12 +563,12 @@ proc cycleAttestationSubnetsPerEpoch( let stabilitySubnets = node.updateStabilitySubnets(wallSlot) - if not node.config.subscribeAllSubnets and + if not node.config.subscribeAllSubnets and stabilitySubnets != prevStabilitySubnets: # In subscribeAllSubnets mode, this only gets set once, at initial subnet # attestation handler creation, since they're all considered as stability # subnets in that case. - node.updateStabilitySubnetMetadata(stabilitySubnets) + node.network.updateStabilitySubnetMetadata(stabilitySubnets) return stabilitySubnets @@ -610,36 +578,34 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets - for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - if i in node.attestationSubnets.subscribedSubnets: + for i in 0..= node.attestationSubnets.unsubscribeSlot[i]: - node.attestationSubnets.subscribedSubnets.excl i + node.attestationSubnets.subscribedSubnets[i] = false else: if wallSlot >= node.attestationSubnets.subscribeSlot[i]: - node.attestationSubnets.subscribedSubnets.incl i + node.attestationSubnets.subscribedSubnets[i] = true let prevStabilitySubnets = - getStabilitySubnets(node.attestationSubnets.stabilitySubnets) + node.attestationSubnets.stabilitySubnets.getStabilitySubnets() stabilitySubnets = await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) # Accounting specific to non-stability subnets - for expiringSubnet in - prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets: - node.attestationSubnets.subscribeSlot[expiringSubnet] = FAR_FUTURE_SLOT + for i, enabled in + (prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets): + if enabled: + node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT let prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets - unsubscribedSubnets = prevAllSubnets - allSubnets - subscribedSubnets = allSubnets - prevAllSubnets + unsubscribeSubnets = prevAllSubnets - allSubnets + subscribeSubnets = allSubnets - prevAllSubnets - for subnet in unsubscribedSubnets: - node.network.unsubscribe( - getAttestationTopic(node.forkDigest, subnet)) - - node.installAttestationSubnetHandlers(subscribedSubnets) + node.network.unsubscribeAttestationSubnets(unsubscribeSubnets) + node.network.subscribeAttestationSubnets(subscribeSubnets) debug "Attestation subnets", expiringSubnets = @@ -652,10 +618,10 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = num_stability_subnets = node.attestationSubnets.stabilitySubnets.len, expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets, new_stability_subnets = stabilitySubnets - prevStabilitySubnets, - subscribedSubnets, - unsubscribedSubnets + subscribeSubnets, + unsubscribeSubnets -proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = +proc getInitialAttestationSubnets(node: BeaconNode): Table[SubnetId, Slot] = let wallEpoch = node.beaconClock.now().slotOrZero().epoch validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys())) @@ -665,12 +631,12 @@ proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = # https://github.com/nim-lang/Nim/issues/16217 are fixed, in # Nimbus's Nim, use (_, _, subnetIndex, slot). let epochRef = node.chainDag.getEpochRef(node.chainDag.head, epoch) - for (_, ci, subnetIndex, slot) in get_committee_assignments( + for (_, ci, subnet_id, slot) in get_committee_assignments( epochRef, epoch, validatorIndices): - result.withValue(subnetIndex, v) do: + result.withValue(subnet_id, v) do: v[] = max(v[], slot + 1) do: - result[subnetIndex] = slot + 1 + result[subnet_id] = slot + 1 # Either wallEpoch is 0, in which case it might be pre-genesis, but we only # care about the already-known first two epochs of attestations, or it's in @@ -680,7 +646,7 @@ proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = mergeAttestationSubnets(wallEpoch) mergeAttestationSubnets(wallEpoch + 1) -proc getAttestationSubnetHandlers(node: BeaconNode) {. +proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. raises: [Defect, CatchableError].} = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability # TODO: @@ -688,53 +654,46 @@ proc getAttestationSubnetHandlers(node: BeaconNode) {. # - Restarting the node with a presistent netkey # - When going from synced -> syncing -> synced state - template getAllAttestationSubnets(): Table[uint8, Slot] = - var subnets: Table[uint8, Slot] - for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - subnets[i] = FAR_FUTURE_SLOT - subnets + if node.config.subscribeAllSubnets: + # In all-subnets mode, we create a stability subnet subscription for every + # subnet - this will be propagated in the attnets ENR entry + node.attestationSubnets.stabilitySubnets.setLen(ATTESTATION_SUBNET_COUNT) + for i, ss in node.attestationSubnets.stabilitySubnets.mpairs(): + ss.subnet_id = SubnetId(i) + ss.expiration = FAR_FUTURE_EPOCH + else: + let wallEpoch = node.beaconClock.now().slotOrZero().epoch + + # TODO make length dynamic when validator-client-based validators join and leave + # In normal mode, there's one subnet subscription per validator, changing + # randomly over time + node.attestationSubnets.stabilitySubnets.setLen( + node.attachedValidators[].count) + for i, ss in node.attestationSubnets.stabilitySubnets.mpairs(): + ss.subnet_id = node.network.getRandomSubnetId() + ss.expiration = wallEpoch + node.network.getStabilitySubnetLength() + + let initialStabilitySubnets = + node.attestationSubnets.stabilitySubnets.getStabilitySubnets() + node.network.updateStabilitySubnetMetadata(initialStabilitySubnets) let - initialSubnets = - if node.config.subscribeAllSubnets: - getAllAttestationSubnets() - else: - node.getInitialAttestationSubnets() - wallEpoch = node.beaconClock.now().slotOrZero().epoch - - var initialStabilitySubnets: set[uint8] - - node.attestationSubnets.stabilitySubnets.setLen( - node.attachedValidators[].count) - for i in 0 ..< node.attachedValidators[].count: - node.attestationSubnets.stabilitySubnets[i] = ( - subnet: node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8, - expiration: wallEpoch + node.getStabilitySubnetLength()) - initialStabilitySubnets.incl( - node.attestationSubnets.stabilitySubnets[i].subnet) - - node.updateStabilitySubnetMetadata( - if node.config.subscribeAllSubnets: - {0'u8 .. (ATTESTATION_SUBNET_COUNT - 1)} - else: - node.attestationSubnets.stabilitySubnets.getStabilitySubnets) - + initialSubnets = node.getInitialAttestationSubnets() for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - if i in initialSubnets: - node.attestationSubnets.subscribedSubnets.incl i + if SubnetId(i) in initialSubnets: + node.attestationSubnets.subscribedSubnets[i] = true node.attestationSubnets.unsubscribeSlot[i] = - try: initialSubnets[i] except KeyError: raiseAssert "checked with in" + try: initialSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in" else: - node.attestationSubnets.subscribedSubnets.excl i + node.attestationSubnets.subscribedSubnets[i] = false node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT node.attestationSubnets.enabled = true debug "Initial attestation subnets subscribed", initialSubnets, - initialStabilitySubnets, - wallEpoch - node.installAttestationSubnetHandlers( + initialStabilitySubnets + node.network.subscribeAttestationSubnets( node.attestationSubnets.subscribedSubnets + initialStabilitySubnets) proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = @@ -792,7 +751,7 @@ proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = node.network.subscribe(getProposerSlashingsTopic(node.forkDigest), basicParams) node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest), basicParams) node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest), aggregateTopicParams, enableTopicMetrics = true) - node.getAttestationSubnetHandlers() + node.subscribeAttestationSubnetHandlers() func getTopicSubscriptionEnabled(node: BeaconNode): bool = node.attestationSubnets.enabled @@ -807,8 +766,9 @@ proc removeMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError]. node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest)) node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest)) - for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: - node.network.unsubscribe(getAttestationTopic(node.forkDigest, subnet)) + for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: + node.network.unsubscribe( + getAttestationTopic(node.forkDigest, SubnetId(subnet_id))) proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) = # When another client's already running, this is very likely to detect @@ -1185,12 +1145,12 @@ proc installMessageValidators(node: BeaconNode) = # subnets are subscribed to during any given epoch. for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: closureScope: - let attestation_subnet = it + let subnet_id = SubnetId(it) node.network.addAsyncValidator( - getAttestationTopic(node.forkDigest, attestation_subnet), + getAttestationTopic(node.forkDigest, subnet_id), # This proc needs to be within closureScope; don't lift out of loop. proc(attestation: Attestation): Future[ValidationResult] = - node.processor.attestationValidator(attestation, attestation_subnet)) + node.processor.attestationValidator(attestation, subnet_id)) node.network.addAsyncValidator( getAggregateAndProofsTopic(node.forkDigest), diff --git a/beacon_chain/nimbus_validator_client.nim b/beacon_chain/nimbus_validator_client.nim index c94dc45b0..af4542040 100644 --- a/beacon_chain/nimbus_validator_client.nim +++ b/beacon_chain/nimbus_validator_client.nim @@ -194,9 +194,12 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a ad, a.committee_length.int, a.validator_committee_index, vc.fork, vc.beaconGenesis.genesis_validators_root) - notice "Attesting", - slot, public_key = a.public_key, attestation = shortLog(attestation) - discard await vc.client.post_v1_beacon_pool_attestations(attestation) + notice "Sending attestation to beacon node", + public_key = a.public_key, attestation = shortLog(attestation) + let ok = await vc.client.post_v1_beacon_pool_attestations(attestation) + if not ok: + warn "Failed to send attestation to beacon node", + public_key = a.public_key, attestation = shortLog(attestation) validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root else: diff --git a/beacon_chain/rpc/beacon_api.nim b/beacon_chain/rpc/beacon_api.nim index 97b2a4213..2353b4837 100644 --- a/beacon_chain/rpc/beacon_api.nim +++ b/beacon_chain/rpc/beacon_api.nim @@ -461,8 +461,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. rpcServer.rpc("post_v1_beacon_pool_attestations") do ( attestation: Attestation) -> bool: - node.sendAttestation(attestation) - return true + return await node.sendAttestation(attestation) rpcServer.rpc("get_v1_beacon_pool_attester_slashings") do ( ) -> seq[AttesterSlashing]: diff --git a/beacon_chain/rpc/beacon_rest_api.nim b/beacon_chain/rpc/beacon_rest_api.nim index 43c614cb0..09bd3ee58 100644 --- a/beacon_chain/rpc/beacon_rest_api.nim +++ b/beacon_chain/rpc/beacon_rest_api.nim @@ -718,15 +718,9 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = var failures: seq[RestAttestationsFailureTuple] for atindex, attestation in attestations.pairs(): - let wallTime = node.processor.getWallTime() - let res = await node.attestationPool.validateAttestation( - node.processor.batchCrypto, attestation, wallTime, - attestation.data.index, true - ) - if res.isErr(): - failures.add((index: uint64(atindex), message: $res.error())) - else: - node.sendAttestation(attestation) + if not await node.sendAttestation(attestation): + failures.add( + (index: uint64(atindex), message: "Attestation failed validation")) if len(failures) > 0: return RestApiResponse.jsonErrorList(Http400, AttestationValidationError, diff --git a/beacon_chain/rpc/validator_api.nim b/beacon_chain/rpc/validator_api.nim index 30508e8ae..8194008ee 100644 --- a/beacon_chain/rpc/validator_api.nim +++ b/beacon_chain/rpc/validator_api.nim @@ -150,21 +150,21 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. let head = node.doChecksAndGetCurrentHead(epoch) epochRef = node.chainDag.getEpochRef(head, epoch) - subnet = compute_subnet_for_attestation( - get_committee_count_per_slot(epochRef), slot, committee_index).uint8 + subnet_id = compute_subnet_for_attestation( + get_committee_count_per_slot(epochRef), slot, committee_index) # Either subnet already subscribed or not. If not, subscribe. If it is, # extend subscription. All one knows from the API combined with how far # ahead one can check for attestation schedule is that it might be used # for up to the end of next epoch. Therefore, arrange for subscriptions # to last at least that long. - if subnet notin node.attestationSubnets.subscribedSubnets: + if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]: # When to subscribe. Since it's not clear when from the API it's first # needed, do so immediately. - node.attestationSubnets.subscribeSlot[subnet] = - min(node.attestationSubnets.subscribeSlot[subnet], wallSlot) + node.attestationSubnets.subscribeSlot[subnet_id.uint64] = + min(node.attestationSubnets.subscribeSlot[subnet_id.uint64], wallSlot) - node.attestationSubnets.unsubscribeSlot[subnet] = + node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] = max( compute_start_slot_at_epoch(epoch + 2), - node.attestationSubnets.unsubscribeSlot[subnet]) + node.attestationSubnets.unsubscribeSlot[subnet_id.uint64]) diff --git a/beacon_chain/spec/datatypes/base.nim b/beacon_chain/spec/datatypes/base.nim index f5d7645f7..3eb73895e 100644 --- a/beacon_chain/spec/datatypes/base.nim +++ b/beacon_chain/spec/datatypes/base.nim @@ -149,6 +149,10 @@ type # leave it at spec size CommitteeIndex* = distinct uint64 + # The subnet id maps which gossip subscription to use to publish an + # attestation - it is distinct from the CommitteeIndex in particular + SubnetId* = distinct uint8 + Gwei* = uint64 # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#proposerslashing @@ -816,6 +820,18 @@ proc readValue*(reader: var JsonReader, value: var CommitteeIndex) {.raises: [IOError, SerializationError, Defect].} = value = CommitteeIndex reader.readValue(distinctBase CommitteeIndex) +proc writeValue*(writer: var JsonWriter, value: SubnetId) + {.raises: [IOError, Defect].} = + writeValue(writer, distinctBase value) + +proc readValue*(reader: var JsonReader, value: var SubnetId) + {.raises: [IOError, SerializationError, Defect].} = + let v = reader.readValue(distinctBase SubnetId) + if v > ATTESTATION_SUBNET_COUNT: + raiseUnexpectedValue( + reader, "Subnet id must be <= " & $ATTESTATION_SUBNET_COUNT) + value = SubnetId(v) + proc writeValue*(writer: var JsonWriter, value: HashList) {.raises: [IOError, SerializationError, Defect].} = writeValue(writer, value.data) @@ -883,6 +899,9 @@ proc `<`*(x, y: CommitteeIndex) : bool {.borrow, noSideEffect.} proc hash*(x: CommitteeIndex): Hash {.borrow, noSideEffect.} func `$`*(x: CommitteeIndex): auto = $(distinctBase(x)) +proc `==`*(x, y: SubnetId) : bool {.borrow, noSideEffect.} +proc `$`*(x: SubnetId): string {.borrow, noSideEffect.} + func `as`*(d: DepositData, T: type DepositMessage): T = T(pubkey: d.pubkey, withdrawal_credentials: d.withdrawal_credentials, @@ -1138,3 +1157,4 @@ static: # Sanity checks - these types should be trivial enough to copy with memcpy doAssert supportsCopyMem(Validator) doAssert supportsCopyMem(Eth2Digest) + doAssert ATTESTATION_SUBNET_COUNT <= high(distinctBase SubnetId).int diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index a03ea2eeb..bb9fdd2a8 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -53,7 +53,7 @@ func getAggregateAndProofsTopic*(forkDigest: ForkDigest): string = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-attestation func compute_subnet_for_attestation*( committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex): - uint64 = + SubnetId = # Compute the correct subnet for an attestation for Phase 0. # Note, this mimics expected Phase 1 behavior where attestations will be # mapped to their shard subnet. @@ -62,16 +62,15 @@ func compute_subnet_for_attestation*( committees_since_epoch_start = committees_per_slot * slots_since_epoch_start - (committees_since_epoch_start + committee_index.uint64) mod - ATTESTATION_SUBNET_COUNT + SubnetId( + (committees_since_epoch_start + committee_index.uint64) mod + ATTESTATION_SUBNET_COUNT) # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-attestation -func getAttestationTopic*(forkDigest: ForkDigest, subnetIndex: uint64): +func getAttestationTopic*(forkDigest: ForkDigest, subnet_id: SubnetId): string = ## For subscribing and unsubscribing to/from a subnet. - doAssert subnetIndex < ATTESTATION_SUBNET_COUNT - - eth2Prefix(forkDigest) & "beacon_attestation_" & $subnetIndex & "/ssz" + eth2Prefix(forkDigest) & "beacon_attestation_" & $uint64(subnet_id) & "/ssz" func getENRForkID*(fork: Fork, genesis_validators_root: Eth2Digest): ENRForkID = let diff --git a/beacon_chain/ssz/bitseqs.nim b/beacon_chain/ssz/bitseqs.nim index 3412d76c3..823c003d8 100644 --- a/beacon_chain/ssz/bitseqs.nim +++ b/beacon_chain/ssz/bitseqs.nim @@ -281,3 +281,29 @@ func countZeros*(x: BitSeq): int = template bytes*(x: BitSeq): untyped = seq[byte](x) + +iterator items*(x: BitArray): bool = + for i in 0..