Perform attestation check before broadcast (#2550)

Currently, we have a bit of a convoluted flow where when sending
attestations, we start broadcasting them over gossip then pass them to
the attestation validation to include them in the local attestation pool
- it should be the other way around: we should be checking attestations
_before_ gossipping them - this serves as an additional safety net to
ensure that we don't publish junk - this becomes more important when
publishing attestations from the API.

Also, the REST API was performing its own validation meaning
attestations coming from REST would be validated twice - finally, the
JSON RPC wasn't pre-validating and would happily broadcast invalid
attestations.

* Unified attestation production pipeline with the same flow for gossip,
locally and API-produced attestations: all are now validated and entered
into the pool, then broadcast/republished
* Refactor subnet handling with specific SubnetId alias, streamlining
where subnets are computed, avoiding the need to pass around the number
of active validators
* Move some of the subnet handling code to eth2_network
* Use BitArray throughout for subnet handling
This commit is contained in:
Jacek Sieka 2021-05-10 09:13:36 +02:00 committed by GitHub
parent 39da640beb
commit 867d8f3223
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 398 additions and 298 deletions

View File

@ -101,9 +101,10 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
```diff ```diff
+ iterating words OK + iterating words OK
+ overlaps OK + overlaps OK
+ roundtrips OK + roundtrips BitArray OK
+ roundtrips BitSeq OK
``` ```
OK: 3/3 Fail: 0/3 Skip: 0/3 OK: 4/4 Fail: 0/4 Skip: 0/4
## Block pool processing [Preset: mainnet] ## Block pool processing [Preset: mainnet]
```diff ```diff
+ Adding the same block twice returns a Duplicate error [Preset: mainnet] OK + Adding the same block twice returns a Duplicate error [Preset: mainnet] OK
@ -318,4 +319,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL--- ---TOTAL---
OK: 175/184 Fail: 0/184 Skip: 9/184 OK: 176/185 Fail: 0/185 Skip: 9/185

View File

@ -135,11 +135,11 @@ type
AttestationSubnets* = object AttestationSubnets* = object
enabled*: bool enabled*: bool
stabilitySubnets*: seq[tuple[subnet: uint8, expiration: Epoch]] stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]]
nextCycleEpoch*: Epoch nextCycleEpoch*: Epoch
# These encode states in per-subnet state machines # These encode states in per-subnet state machines
subscribedSubnets*: set[uint8] subscribedSubnets*: BitArray[ATTESTATION_SUBNET_COUNT]
subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]
unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]

View File

@ -24,6 +24,10 @@ func count_active_validators*(epochInfo: EpochRef): uint64 =
func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 = func get_committee_count_per_slot*(epochInfo: EpochRef): uint64 =
get_committee_count_per_slot(count_active_validators(epochInfo)) get_committee_count_per_slot(count_active_validators(epochInfo))
iterator get_committee_indices*(epochRef: EpochRef): CommitteeIndex =
for i in 0'u64..<get_committee_count_per_slot(epochRef):
yield CommitteeIndex(i)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#get_beacon_committee # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#get_beacon_committee
iterator get_beacon_committee*( iterator get_beacon_committee*(
epochRef: EpochRef, slot: Slot, index: CommitteeIndex): ValidatorIndex = epochRef: EpochRef, slot: Slot, index: CommitteeIndex): ValidatorIndex =
@ -225,7 +229,7 @@ iterator get_committee_assignments*(
epochRef: EpochRef, epoch: Epoch, validator_indices: IntSet): epochRef: EpochRef, epoch: Epoch, validator_indices: IntSet):
tuple[validatorIndices: IntSet, tuple[validatorIndices: IntSet,
committeeIndex: CommitteeIndex, committeeIndex: CommitteeIndex,
subnetIndex: uint8, slot: Slot] = subnet_id: SubnetId, slot: Slot] =
let let
committees_per_slot = get_committee_count_per_slot(epochRef) committees_per_slot = get_committee_count_per_slot(epochRef)
start_slot = compute_start_slot_at_epoch(epoch) start_slot = compute_start_slot_at_epoch(epoch)
@ -240,5 +244,5 @@ iterator get_committee_assignments*(
if includedIndices.len > 0: if includedIndices.len > 0:
yield ( yield (
includedIndices, idx, includedIndices, idx,
compute_subnet_for_attestation(committees_per_slot, slot, idx).uint8, compute_subnet_for_attestation(committees_per_slot, slot, idx),
slot) slot)

View File

@ -187,11 +187,11 @@ proc checkForPotentialDoppelganger(
proc attestationValidator*( proc attestationValidator*(
self: ref Eth2Processor, self: ref Eth2Processor,
attestation: Attestation, attestation: Attestation,
attestation_subnet: uint64, subnet_id: SubnetId,
checksExpensive: bool = true): Future[ValidationResult] {.async.} = checkSignature: bool = true): Future[ValidationResult] {.async.} =
logScope: logScope:
attestation = shortLog(attestation) attestation = shortLog(attestation)
attestation_subnet subnet_id
let let
wallTime = self.getWallTime() wallTime = self.getWallTime()
@ -209,8 +209,7 @@ proc attestationValidator*(
# Now proceed to validation # Now proceed to validation
let v = await self.attestationPool.validateAttestation( let v = await self.attestationPool.validateAttestation(
self.batchCrypto, self.batchCrypto, attestation, wallTime, subnet_id, checkSignature)
attestation, wallTime, attestation_subnet, checksExpensive)
if v.isErr(): if v.isErr():
debug "Dropping attestation", err = v.error() debug "Dropping attestation", err = v.error()
return v.error[0] return v.error[0]

View File

@ -140,14 +140,14 @@ func check_aggregation_count(
func check_attestation_subnet( func check_attestation_subnet(
epochRef: EpochRef, attestation: Attestation, epochRef: EpochRef, attestation: Attestation,
attestation_subnet: uint64): Result[void, (ValidationResult, cstring)] = subnet_id: SubnetId): Result[void, (ValidationResult, cstring)] =
let let
expectedSubnet = expectedSubnet =
compute_subnet_for_attestation( compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), get_committee_count_per_slot(epochRef),
attestation.data.slot, attestation.data.index.CommitteeIndex) attestation.data.slot, attestation.data.index.CommitteeIndex)
if expectedSubnet != attestation_subnet: if expectedSubnet != subnet_id:
return err((ValidationResult.Reject, cstring( return err((ValidationResult.Reject, cstring(
"Attestation not on the correct subnet"))) "Attestation not on the correct subnet")))
@ -162,7 +162,7 @@ proc validateAttestation*(
batchCrypto: ref BatchCrypto, batchCrypto: ref BatchCrypto,
attestation: Attestation, attestation: Attestation,
wallTime: BeaconTime, wallTime: BeaconTime,
attestation_subnet: uint64, checksExpensive: bool): subnet_id: SubnetId, checkSignature: bool):
Future[Result[tuple[attesting_index: ValidatorIndex, sig: CookedSig], Future[Result[tuple[attesting_index: ValidatorIndex, sig: CookedSig],
(ValidationResult, cstring)]] {.async.} = (ValidationResult, cstring)]] {.async.} =
# Some of the checks below have been reordered compared to the spec, to # Some of the checks below have been reordered compared to the spec, to
@ -232,7 +232,7 @@ proc validateAttestation*(
# attestation.data.target.epoch), which may be pre-computed along with the # attestation.data.target.epoch), which may be pre-computed along with the
# committee information for the signature check. # committee information for the signature check.
block: block:
let v = check_attestation_subnet(epochRef, attestation, attestation_subnet) # [REJECT] let v = check_attestation_subnet(epochRef, attestation, subnet_id) # [REJECT]
if v.isErr(): if v.isErr():
return err(v.error) return err(v.error)
@ -271,14 +271,6 @@ proc validateAttestation*(
return err((ValidationResult.Ignore, cstring( return err((ValidationResult.Ignore, cstring(
"Validator has already voted in epoch"))) "Validator has already voted in epoch")))
if not checksExpensive:
# Only sendAttestation, which discards result, doesn't use checksExpensive
# TODO this means that (a) this becomes an "expensive" check and (b) it is
# doing in-principle unnecessary work, since this should be known from the
# attestation creation.
return ok((validator_index, attestation.signature.load.get().CookedSig))
# The signature of attestation is valid.
block: block:
# First pass - without cryptography # First pass - without cryptography
let v = is_valid_indexed_attestation( let v = is_valid_indexed_attestation(
@ -287,29 +279,38 @@ proc validateAttestation*(
if v.isErr(): if v.isErr():
return err((ValidationResult.Reject, v.error)) return err((ValidationResult.Reject, v.error))
# Buffer crypto checks let sig =
let deferredCrypto = batchCrypto if checkSignature:
.scheduleAttestationCheck( # Attestation signatures are batch-verified
fork, genesis_validators_root, epochRef, let deferredCrypto = batchCrypto
attestation .scheduleAttestationCheck(
) fork, genesis_validators_root, epochRef,
if deferredCrypto.isNone(): attestation
return err((ValidationResult.Reject, )
cstring("validateAttestation: crypto sanity checks failure"))) if deferredCrypto.isNone():
return err((ValidationResult.Reject,
cstring("validateAttestation: crypto sanity checks failure")))
# Await the crypto check # Await the crypto check
let let
(cryptoFut, sig) = deferredCrypto.get() (cryptoFut, sig) = deferredCrypto.get()
var x = (await cryptoFut) var x = (await cryptoFut)
case x case x
of BatchResult.Invalid: of BatchResult.Invalid:
return err((ValidationResult.Reject, cstring("validateAttestation: invalid signature"))) return err((ValidationResult.Reject, cstring("validateAttestation: invalid signature")))
of BatchResult.Timeout: of BatchResult.Timeout:
beacon_attestations_dropped_queue_full.inc() beacon_attestations_dropped_queue_full.inc()
return err((ValidationResult.Ignore, cstring("validateAttestation: timeout checking signature"))) return err((ValidationResult.Ignore, cstring("validateAttestation: timeout checking signature")))
of BatchResult.Valid: of BatchResult.Valid:
discard # keep going only in this case sig # keep going only in this case
else:
let sig = attestation.signature.load()
if not sig.isSome():
return err((
ValidationResult.Ignore,
cstring("validateAttestation: unable to load signature")))
sig.get()
# Only valid attestations go in the list, which keeps validator_index # Only valid attestations go in the list, which keeps validator_index
# in range # in range

View File

@ -31,7 +31,7 @@ import
libp2p/stream/connection, libp2p/stream/connection,
libp2p/utils/semaphore, libp2p/utils/semaphore,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
eth/net/nat, eth/p2p/discoveryv5/[enr, node], eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[ ".."/[
version, conf, version, conf,
ssz/ssz_serialization, beacon_clock], ssz/ssz_serialization, beacon_clock],
@ -1763,3 +1763,50 @@ proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
traceMessage(futSnappy, gossipId(uncompressed, true)) traceMessage(futSnappy, gossipId(uncompressed, true))
except IOError as exc: except IOError as exc:
raiseAssert exc.msg # TODO in-memory compression shouldn't fail raiseAssert exc.msg # TODO in-memory compression shouldn't fail
proc subscribeAttestationSubnets*(
node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {.
raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet_id, enabled in subnets:
if enabled:
node.subscribe(getAttestationTopic(
node.forkID.fork_digest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now
proc unsubscribeAttestationSubnets*(
node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT]) {.
raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet_id, enabled in subnets:
if enabled:
node.unsubscribe(getAttestationTopic(
node.forkID.fork_digest, SubnetId(subnet_id)))
proc updateStabilitySubnetMetadata*(
node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#metadata
node.metadata.seq_number += 1
node.metadata.attnets = attnets
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestation-subnet-bitfield
let res = node.discovery.updateRecord(
{"attnets": SSZ.encode(node.metadata.attnets)})
if res.isErr():
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size.
warn "Failed to update record on subnet cycle", error = res.error
else:
debug "Stability subnets changed; updated ENR attnets", attnets
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
func getStabilitySubnetLength*(node: Eth2Node): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
node.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
func getRandomSubnetId*(node: Eth2Node): SubnetId =
node.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).SubnetId

View File

@ -384,8 +384,8 @@ proc init*(T: type BeaconNode,
getVoluntaryExitsTopic(enrForkId.fork_digest), getVoluntaryExitsTopic(enrForkId.fork_digest),
getAggregateAndProofsTopic(enrForkId.fork_digest) getAggregateAndProofsTopic(enrForkId.fork_digest)
] ]
for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
topics &= getAttestationTopic(enrForkId.fork_digest, subnet) topics &= getAttestationTopic(enrForkId.fork_digest, SubnetId(subnet_id))
topics) topics)
if node.config.inProcessValidators: if node.config.inProcessValidators:
@ -426,34 +426,9 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot. # finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch doAssert finalizedEpoch + 4 >= epoch
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) {. func getStabilitySubnets(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] =
raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet in subnets:
node.network.subscribe(getAttestationTopic(node.forkDigest, subnet), TopicParams.init()) # don't score attestation subnets for now
proc updateStabilitySubnetMetadata(
node: BeaconNode, stabilitySubnets: set[uint8]) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#metadata
node.network.metadata.seq_number += 1
for subnet in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
node.network.metadata.attnets[subnet] = (subnet in stabilitySubnets)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestation-subnet-bitfield
let res = node.network.discovery.updateRecord(
{"attnets": SSZ.encode(node.network.metadata.attnets)})
if res.isErr():
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size.
warn "Failed to update record on subnet cycle", error = res.error
else:
debug "Stability subnets changed; updated ENR attnets", stabilitySubnets
func getStabilitySubnets(stabilitySubnets: auto): set[uint8] =
for subnetInfo in stabilitySubnets: for subnetInfo in stabilitySubnets:
result.incl subnetInfo.subnet result[subnetInfo.subnet_id.int] = true
proc getAttachedValidators(node: BeaconNode): proc getAttachedValidators(node: BeaconNode):
Table[ValidatorIndex, AttachedValidator] = Table[ValidatorIndex, AttachedValidator] =
@ -503,7 +478,7 @@ proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} =
# The relevant bitmaps are 32 bits each. # The relevant bitmaps are 32 bits each.
static: doAssert SLOTS_PER_EPOCH <= 32 static: doAssert SLOTS_PER_EPOCH <= 32
for (validatorIndices, committeeIndex, subnetIndex, slot) in for (validatorIndices, committeeIndex, subnet_id, slot) in
get_committee_assignments(epochRef, epoch, validatorIndices): get_committee_assignments(epochRef, epoch, validatorIndices):
doAssert compute_epoch_at_slot(slot) == epoch doAssert compute_epoch_at_slot(slot) == epoch
@ -530,43 +505,36 @@ proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} =
get_beacon_committee_len(epochRef, slot, committeeIndex), slot): get_beacon_committee_len(epochRef, slot, committeeIndex), slot):
continue continue
node.attestationSubnets.unsubscribeSlot[subnetIndex] = node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] =
max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnetIndex]) max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnet_id.uint64])
if subnetIndex notin node.attestationSubnets.subscribedSubnets: if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]:
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34 const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34
node.attestationSubnets.subscribeSlot[subnetIndex] = node.attestationSubnets.subscribeSlot[subnet_id.uint64] =
# Queue upcoming subscription potentially earlier # Queue upcoming subscription potentially earlier
# SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch # SLOTS_PER_EPOCH emulates one boundary condition of the per-epoch
# cycling mechanism timing buffers # cycling mechanism timing buffers
min( min(
slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS), slot - min(slot.uint64, SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS),
node.attestationSubnets.subscribeSlot[subnetIndex]) node.attestationSubnets.subscribeSlot[subnet_id.uint64])
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability func updateStabilitySubnets(node: BeaconNode, slot: Slot): BitArray[ATTESTATION_SUBNET_COUNT] =
func getStabilitySubnetLength(node: BeaconNode): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
node.network.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
func updateStabilitySubnets(node: BeaconNode, slot: Slot): set[uint8] =
# Equivalent to wallSlot by cycleAttestationSubnets(), especially # Equivalent to wallSlot by cycleAttestationSubnets(), especially
# since it'll try to run early in epochs, avoiding race conditions. # since it'll try to run early in epochs, avoiding race conditions.
static: doAssert ATTESTATION_SUBNET_COUNT <= high(uint8)
let epoch = slot.epoch let epoch = slot.epoch
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
for i in 0 ..< node.attestationSubnets.stabilitySubnets.len: for ss in node.attestationSubnets.stabilitySubnets.mitems():
if epoch >= node.attestationSubnets.stabilitySubnets[i].expiration: if epoch >= ss.expiration:
node.attestationSubnets.stabilitySubnets[i].subnet = ss.subnet_id = node.network.getRandomSubnetId()
node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8 ss.expiration = epoch + node.network.getStabilitySubnetLength()
node.attestationSubnets.stabilitySubnets[i].expiration =
epoch + node.getStabilitySubnetLength()
result.incl node.attestationSubnets.stabilitySubnets[i].subnet result[ss.subnet_id.int] = true
proc cycleAttestationSubnetsPerEpoch( proc cycleAttestationSubnetsPerEpoch(
node: BeaconNode, wallSlot: Slot, prevStabilitySubnets: set[uint8]): node: BeaconNode, wallSlot: Slot,
Future[set[uint8]] {.async.} = prevStabilitySubnets: BitArray[ATTESTATION_SUBNET_COUNT]):
Future[BitArray[ATTESTATION_SUBNET_COUNT]] {.async.} =
# Per-epoch portion of subnet cycling: updating stability subnets and # Per-epoch portion of subnet cycling: updating stability subnets and
# calculating future attestation subnets. # calculating future attestation subnets.
@ -595,12 +563,12 @@ proc cycleAttestationSubnetsPerEpoch(
let stabilitySubnets = node.updateStabilitySubnets(wallSlot) let stabilitySubnets = node.updateStabilitySubnets(wallSlot)
if not node.config.subscribeAllSubnets and if not node.config.subscribeAllSubnets and
stabilitySubnets != prevStabilitySubnets: stabilitySubnets != prevStabilitySubnets:
# In subscribeAllSubnets mode, this only gets set once, at initial subnet # In subscribeAllSubnets mode, this only gets set once, at initial subnet
# attestation handler creation, since they're all considered as stability # attestation handler creation, since they're all considered as stability
# subnets in that case. # subnets in that case.
node.updateStabilitySubnetMetadata(stabilitySubnets) node.network.updateStabilitySubnetMetadata(stabilitySubnets)
return stabilitySubnets return stabilitySubnets
@ -610,36 +578,34 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: for i in 0..<node.attestationSubnets.subscribedSubnets.len():
if i in node.attestationSubnets.subscribedSubnets: if node.attestationSubnets.subscribedSubnets[i]:
if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]: if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]:
node.attestationSubnets.subscribedSubnets.excl i node.attestationSubnets.subscribedSubnets[i] = false
else: else:
if wallSlot >= node.attestationSubnets.subscribeSlot[i]: if wallSlot >= node.attestationSubnets.subscribeSlot[i]:
node.attestationSubnets.subscribedSubnets.incl i node.attestationSubnets.subscribedSubnets[i] = true
let let
prevStabilitySubnets = prevStabilitySubnets =
getStabilitySubnets(node.attestationSubnets.stabilitySubnets) node.attestationSubnets.stabilitySubnets.getStabilitySubnets()
stabilitySubnets = stabilitySubnets =
await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets)
# Accounting specific to non-stability subnets # Accounting specific to non-stability subnets
for expiringSubnet in for i, enabled in
prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets: (prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets):
node.attestationSubnets.subscribeSlot[expiringSubnet] = FAR_FUTURE_SLOT if enabled:
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
let let
prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets
allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets
unsubscribedSubnets = prevAllSubnets - allSubnets unsubscribeSubnets = prevAllSubnets - allSubnets
subscribedSubnets = allSubnets - prevAllSubnets subscribeSubnets = allSubnets - prevAllSubnets
for subnet in unsubscribedSubnets: node.network.unsubscribeAttestationSubnets(unsubscribeSubnets)
node.network.unsubscribe( node.network.subscribeAttestationSubnets(subscribeSubnets)
getAttestationTopic(node.forkDigest, subnet))
node.installAttestationSubnetHandlers(subscribedSubnets)
debug "Attestation subnets", debug "Attestation subnets",
expiringSubnets = expiringSubnets =
@ -652,10 +618,10 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
num_stability_subnets = node.attestationSubnets.stabilitySubnets.len, num_stability_subnets = node.attestationSubnets.stabilitySubnets.len,
expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets, expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets,
new_stability_subnets = stabilitySubnets - prevStabilitySubnets, new_stability_subnets = stabilitySubnets - prevStabilitySubnets,
subscribedSubnets, subscribeSubnets,
unsubscribedSubnets unsubscribeSubnets
proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] = proc getInitialAttestationSubnets(node: BeaconNode): Table[SubnetId, Slot] =
let let
wallEpoch = node.beaconClock.now().slotOrZero().epoch wallEpoch = node.beaconClock.now().slotOrZero().epoch
validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys())) validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys()))
@ -665,12 +631,12 @@ proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] =
# https://github.com/nim-lang/Nim/issues/16217 are fixed, in # https://github.com/nim-lang/Nim/issues/16217 are fixed, in
# Nimbus's Nim, use (_, _, subnetIndex, slot). # Nimbus's Nim, use (_, _, subnetIndex, slot).
let epochRef = node.chainDag.getEpochRef(node.chainDag.head, epoch) let epochRef = node.chainDag.getEpochRef(node.chainDag.head, epoch)
for (_, ci, subnetIndex, slot) in get_committee_assignments( for (_, ci, subnet_id, slot) in get_committee_assignments(
epochRef, epoch, validatorIndices): epochRef, epoch, validatorIndices):
result.withValue(subnetIndex, v) do: result.withValue(subnet_id, v) do:
v[] = max(v[], slot + 1) v[] = max(v[], slot + 1)
do: do:
result[subnetIndex] = slot + 1 result[subnet_id] = slot + 1
# Either wallEpoch is 0, in which case it might be pre-genesis, but we only # Either wallEpoch is 0, in which case it might be pre-genesis, but we only
# care about the already-known first two epochs of attestations, or it's in # care about the already-known first two epochs of attestations, or it's in
@ -680,7 +646,7 @@ proc getInitialAttestationSubnets(node: BeaconNode): Table[uint8, Slot] =
mergeAttestationSubnets(wallEpoch) mergeAttestationSubnets(wallEpoch)
mergeAttestationSubnets(wallEpoch + 1) mergeAttestationSubnets(wallEpoch + 1)
proc getAttestationSubnetHandlers(node: BeaconNode) {. proc subscribeAttestationSubnetHandlers(node: BeaconNode) {.
raises: [Defect, CatchableError].} = raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# TODO: # TODO:
@ -688,53 +654,46 @@ proc getAttestationSubnetHandlers(node: BeaconNode) {.
# - Restarting the node with a presistent netkey # - Restarting the node with a presistent netkey
# - When going from synced -> syncing -> synced state # - When going from synced -> syncing -> synced state
template getAllAttestationSubnets(): Table[uint8, Slot] = if node.config.subscribeAllSubnets:
var subnets: Table[uint8, Slot] # In all-subnets mode, we create a stability subnet subscription for every
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: # subnet - this will be propagated in the attnets ENR entry
subnets[i] = FAR_FUTURE_SLOT node.attestationSubnets.stabilitySubnets.setLen(ATTESTATION_SUBNET_COUNT)
subnets for i, ss in node.attestationSubnets.stabilitySubnets.mpairs():
ss.subnet_id = SubnetId(i)
ss.expiration = FAR_FUTURE_EPOCH
else:
let wallEpoch = node.beaconClock.now().slotOrZero().epoch
# TODO make length dynamic when validator-client-based validators join and leave
# In normal mode, there's one subnet subscription per validator, changing
# randomly over time
node.attestationSubnets.stabilitySubnets.setLen(
node.attachedValidators[].count)
for i, ss in node.attestationSubnets.stabilitySubnets.mpairs():
ss.subnet_id = node.network.getRandomSubnetId()
ss.expiration = wallEpoch + node.network.getStabilitySubnetLength()
let initialStabilitySubnets =
node.attestationSubnets.stabilitySubnets.getStabilitySubnets()
node.network.updateStabilitySubnetMetadata(initialStabilitySubnets)
let let
initialSubnets = initialSubnets = node.getInitialAttestationSubnets()
if node.config.subscribeAllSubnets:
getAllAttestationSubnets()
else:
node.getInitialAttestationSubnets()
wallEpoch = node.beaconClock.now().slotOrZero().epoch
var initialStabilitySubnets: set[uint8]
node.attestationSubnets.stabilitySubnets.setLen(
node.attachedValidators[].count)
for i in 0 ..< node.attachedValidators[].count:
node.attestationSubnets.stabilitySubnets[i] = (
subnet: node.network.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).uint8,
expiration: wallEpoch + node.getStabilitySubnetLength())
initialStabilitySubnets.incl(
node.attestationSubnets.stabilitySubnets[i].subnet)
node.updateStabilitySubnetMetadata(
if node.config.subscribeAllSubnets:
{0'u8 .. (ATTESTATION_SUBNET_COUNT - 1)}
else:
node.attestationSubnets.stabilitySubnets.getStabilitySubnets)
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
if i in initialSubnets: if SubnetId(i) in initialSubnets:
node.attestationSubnets.subscribedSubnets.incl i node.attestationSubnets.subscribedSubnets[i] = true
node.attestationSubnets.unsubscribeSlot[i] = node.attestationSubnets.unsubscribeSlot[i] =
try: initialSubnets[i] except KeyError: raiseAssert "checked with in" try: initialSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in"
else: else:
node.attestationSubnets.subscribedSubnets.excl i node.attestationSubnets.subscribedSubnets[i] = false
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
node.attestationSubnets.enabled = true node.attestationSubnets.enabled = true
debug "Initial attestation subnets subscribed", debug "Initial attestation subnets subscribed",
initialSubnets, initialSubnets,
initialStabilitySubnets, initialStabilitySubnets
wallEpoch node.network.subscribeAttestationSubnets(
node.installAttestationSubnetHandlers(
node.attestationSubnets.subscribedSubnets + initialStabilitySubnets) node.attestationSubnets.subscribedSubnets + initialStabilitySubnets)
proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} =
@ -792,7 +751,7 @@ proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.network.subscribe(getProposerSlashingsTopic(node.forkDigest), basicParams) node.network.subscribe(getProposerSlashingsTopic(node.forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest), basicParams) node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest), aggregateTopicParams, enableTopicMetrics = true) node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.getAttestationSubnetHandlers() node.subscribeAttestationSubnetHandlers()
func getTopicSubscriptionEnabled(node: BeaconNode): bool = func getTopicSubscriptionEnabled(node: BeaconNode): bool =
node.attestationSubnets.enabled node.attestationSubnets.enabled
@ -807,8 +766,9 @@ proc removeMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].
node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest)) node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest))
node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest)) node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest))
for subnet in 0'u64 ..< ATTESTATION_SUBNET_COUNT: for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
node.network.unsubscribe(getAttestationTopic(node.forkDigest, subnet)) node.network.unsubscribe(
getAttestationTopic(node.forkDigest, SubnetId(subnet_id)))
proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) = proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
# When another client's already running, this is very likely to detect # When another client's already running, this is very likely to detect
@ -1185,12 +1145,12 @@ proc installMessageValidators(node: BeaconNode) =
# subnets are subscribed to during any given epoch. # subnets are subscribed to during any given epoch.
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope: closureScope:
let attestation_subnet = it let subnet_id = SubnetId(it)
node.network.addAsyncValidator( node.network.addAsyncValidator(
getAttestationTopic(node.forkDigest, attestation_subnet), getAttestationTopic(node.forkDigest, subnet_id),
# This proc needs to be within closureScope; don't lift out of loop. # This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] = proc(attestation: Attestation): Future[ValidationResult] =
node.processor.attestationValidator(attestation, attestation_subnet)) node.processor.attestationValidator(attestation, subnet_id))
node.network.addAsyncValidator( node.network.addAsyncValidator(
getAggregateAndProofsTopic(node.forkDigest), getAggregateAndProofsTopic(node.forkDigest),

View File

@ -194,9 +194,12 @@ proc onSlotStart(vc: ValidatorClient, lastSlot, scheduledSlot: Slot) {.gcsafe, a
ad, a.committee_length.int, a.validator_committee_index, ad, a.committee_length.int, a.validator_committee_index,
vc.fork, vc.beaconGenesis.genesis_validators_root) vc.fork, vc.beaconGenesis.genesis_validators_root)
notice "Attesting", notice "Sending attestation to beacon node",
slot, public_key = a.public_key, attestation = shortLog(attestation) public_key = a.public_key, attestation = shortLog(attestation)
discard await vc.client.post_v1_beacon_pool_attestations(attestation) let ok = await vc.client.post_v1_beacon_pool_attestations(attestation)
if not ok:
warn "Failed to send attestation to beacon node",
public_key = a.public_key, attestation = shortLog(attestation)
validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root validatorToAttestationDataRoot[a.public_key] = attestation.data.hash_tree_root
else: else:

View File

@ -461,8 +461,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
rpcServer.rpc("post_v1_beacon_pool_attestations") do ( rpcServer.rpc("post_v1_beacon_pool_attestations") do (
attestation: Attestation) -> bool: attestation: Attestation) -> bool:
node.sendAttestation(attestation) return await node.sendAttestation(attestation)
return true
rpcServer.rpc("get_v1_beacon_pool_attester_slashings") do ( rpcServer.rpc("get_v1_beacon_pool_attester_slashings") do (
) -> seq[AttesterSlashing]: ) -> seq[AttesterSlashing]:

View File

@ -718,15 +718,9 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
var failures: seq[RestAttestationsFailureTuple] var failures: seq[RestAttestationsFailureTuple]
for atindex, attestation in attestations.pairs(): for atindex, attestation in attestations.pairs():
let wallTime = node.processor.getWallTime() if not await node.sendAttestation(attestation):
let res = await node.attestationPool.validateAttestation( failures.add(
node.processor.batchCrypto, attestation, wallTime, (index: uint64(atindex), message: "Attestation failed validation"))
attestation.data.index, true
)
if res.isErr():
failures.add((index: uint64(atindex), message: $res.error()))
else:
node.sendAttestation(attestation)
if len(failures) > 0: if len(failures) > 0:
return RestApiResponse.jsonErrorList(Http400, AttestationValidationError, return RestApiResponse.jsonErrorList(Http400, AttestationValidationError,

View File

@ -150,21 +150,21 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
let let
head = node.doChecksAndGetCurrentHead(epoch) head = node.doChecksAndGetCurrentHead(epoch)
epochRef = node.chainDag.getEpochRef(head, epoch) epochRef = node.chainDag.getEpochRef(head, epoch)
subnet = compute_subnet_for_attestation( subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), slot, committee_index).uint8 get_committee_count_per_slot(epochRef), slot, committee_index)
# Either subnet already subscribed or not. If not, subscribe. If it is, # Either subnet already subscribed or not. If not, subscribe. If it is,
# extend subscription. All one knows from the API combined with how far # extend subscription. All one knows from the API combined with how far
# ahead one can check for attestation schedule is that it might be used # ahead one can check for attestation schedule is that it might be used
# for up to the end of next epoch. Therefore, arrange for subscriptions # for up to the end of next epoch. Therefore, arrange for subscriptions
# to last at least that long. # to last at least that long.
if subnet notin node.attestationSubnets.subscribedSubnets: if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]:
# When to subscribe. Since it's not clear when from the API it's first # When to subscribe. Since it's not clear when from the API it's first
# needed, do so immediately. # needed, do so immediately.
node.attestationSubnets.subscribeSlot[subnet] = node.attestationSubnets.subscribeSlot[subnet_id.uint64] =
min(node.attestationSubnets.subscribeSlot[subnet], wallSlot) min(node.attestationSubnets.subscribeSlot[subnet_id.uint64], wallSlot)
node.attestationSubnets.unsubscribeSlot[subnet] = node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] =
max( max(
compute_start_slot_at_epoch(epoch + 2), compute_start_slot_at_epoch(epoch + 2),
node.attestationSubnets.unsubscribeSlot[subnet]) node.attestationSubnets.unsubscribeSlot[subnet_id.uint64])

View File

@ -149,6 +149,10 @@ type
# leave it at spec size # leave it at spec size
CommitteeIndex* = distinct uint64 CommitteeIndex* = distinct uint64
# The subnet id maps which gossip subscription to use to publish an
# attestation - it is distinct from the CommitteeIndex in particular
SubnetId* = distinct uint8
Gwei* = uint64 Gwei* = uint64
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#proposerslashing # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/beacon-chain.md#proposerslashing
@ -816,6 +820,18 @@ proc readValue*(reader: var JsonReader, value: var CommitteeIndex)
{.raises: [IOError, SerializationError, Defect].} = {.raises: [IOError, SerializationError, Defect].} =
value = CommitteeIndex reader.readValue(distinctBase CommitteeIndex) value = CommitteeIndex reader.readValue(distinctBase CommitteeIndex)
proc writeValue*(writer: var JsonWriter, value: SubnetId)
{.raises: [IOError, Defect].} =
writeValue(writer, distinctBase value)
proc readValue*(reader: var JsonReader, value: var SubnetId)
{.raises: [IOError, SerializationError, Defect].} =
let v = reader.readValue(distinctBase SubnetId)
if v > ATTESTATION_SUBNET_COUNT:
raiseUnexpectedValue(
reader, "Subnet id must be <= " & $ATTESTATION_SUBNET_COUNT)
value = SubnetId(v)
proc writeValue*(writer: var JsonWriter, value: HashList) proc writeValue*(writer: var JsonWriter, value: HashList)
{.raises: [IOError, SerializationError, Defect].} = {.raises: [IOError, SerializationError, Defect].} =
writeValue(writer, value.data) writeValue(writer, value.data)
@ -883,6 +899,9 @@ proc `<`*(x, y: CommitteeIndex) : bool {.borrow, noSideEffect.}
proc hash*(x: CommitteeIndex): Hash {.borrow, noSideEffect.} proc hash*(x: CommitteeIndex): Hash {.borrow, noSideEffect.}
func `$`*(x: CommitteeIndex): auto = $(distinctBase(x)) func `$`*(x: CommitteeIndex): auto = $(distinctBase(x))
proc `==`*(x, y: SubnetId) : bool {.borrow, noSideEffect.}
proc `$`*(x: SubnetId): string {.borrow, noSideEffect.}
func `as`*(d: DepositData, T: type DepositMessage): T = func `as`*(d: DepositData, T: type DepositMessage): T =
T(pubkey: d.pubkey, T(pubkey: d.pubkey,
withdrawal_credentials: d.withdrawal_credentials, withdrawal_credentials: d.withdrawal_credentials,
@ -1138,3 +1157,4 @@ static:
# Sanity checks - these types should be trivial enough to copy with memcpy # Sanity checks - these types should be trivial enough to copy with memcpy
doAssert supportsCopyMem(Validator) doAssert supportsCopyMem(Validator)
doAssert supportsCopyMem(Eth2Digest) doAssert supportsCopyMem(Eth2Digest)
doAssert ATTESTATION_SUBNET_COUNT <= high(distinctBase SubnetId).int

View File

@ -53,7 +53,7 @@ func getAggregateAndProofsTopic*(forkDigest: ForkDigest): string =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-attestation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-attestation
func compute_subnet_for_attestation*( func compute_subnet_for_attestation*(
committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex): committees_per_slot: uint64, slot: Slot, committee_index: CommitteeIndex):
uint64 = SubnetId =
# Compute the correct subnet for an attestation for Phase 0. # Compute the correct subnet for an attestation for Phase 0.
# Note, this mimics expected Phase 1 behavior where attestations will be # Note, this mimics expected Phase 1 behavior where attestations will be
# mapped to their shard subnet. # mapped to their shard subnet.
@ -62,16 +62,15 @@ func compute_subnet_for_attestation*(
committees_since_epoch_start = committees_since_epoch_start =
committees_per_slot * slots_since_epoch_start committees_per_slot * slots_since_epoch_start
(committees_since_epoch_start + committee_index.uint64) mod SubnetId(
ATTESTATION_SUBNET_COUNT (committees_since_epoch_start + committee_index.uint64) mod
ATTESTATION_SUBNET_COUNT)
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-attestation # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#broadcast-attestation
func getAttestationTopic*(forkDigest: ForkDigest, subnetIndex: uint64): func getAttestationTopic*(forkDigest: ForkDigest, subnet_id: SubnetId):
string = string =
## For subscribing and unsubscribing to/from a subnet. ## For subscribing and unsubscribing to/from a subnet.
doAssert subnetIndex < ATTESTATION_SUBNET_COUNT eth2Prefix(forkDigest) & "beacon_attestation_" & $uint64(subnet_id) & "/ssz"
eth2Prefix(forkDigest) & "beacon_attestation_" & $subnetIndex & "/ssz"
func getENRForkID*(fork: Fork, genesis_validators_root: Eth2Digest): ENRForkID = func getENRForkID*(fork: Fork, genesis_validators_root: Eth2Digest): ENRForkID =
let let

View File

@ -281,3 +281,29 @@ func countZeros*(x: BitSeq): int =
template bytes*(x: BitSeq): untyped = template bytes*(x: BitSeq): untyped =
seq[byte](x) seq[byte](x)
iterator items*(x: BitArray): bool =
for i in 0..<x.bits:
yield x[i]
iterator pairs*(x: BitArray): (int, bool) =
for i in 0..<x.bits:
yield (i, x[i])
func incl*(a: var BitArray, b: BitArray) =
# Update `a` to include the bits of `b`, as if applying `or` to each bit
for i in 0..<a.bytes.len:
a[i] = a[i] or b[i]
func clear*(a: var BitArray) =
for b in a.bytes.mitems(): b = 0
# Set operations
func `+`*(a, b: BitArray): BitArray =
for i in 0..<a.bytes.len:
result[i] = a[i] or b[i]
func `-`*(a, b: BitArray): BitArray =
for i in 0..<a.bytes.len:
result[i] = a[i] and (not b[i])

View File

@ -160,19 +160,26 @@ proc isSynced*(node: BeaconNode, head: BlockRef): bool =
true true
proc sendAttestation*( proc sendAttestation*(
node: BeaconNode, attestation: Attestation, num_active_validators: uint64) = node: BeaconNode, attestation: Attestation,
let subnet_index = subnet_id: SubnetId, checkSignature: bool): Future[bool] {.async.} =
compute_subnet_for_attestation( # Validate attestation before sending it via gossip - validation will also
get_committee_count_per_slot(num_active_validators), attestation.data.slot, # register the attestation with the attestation pool. Notably, although
attestation.data.index.CommitteeIndex) # libp2p calls the data handler for any subscription on the subnet
node.network.broadcast( # topic, it does not perform validation.
getAttestationTopic(node.forkDigest, subnet_index), attestation) let ok = await node.processor.attestationValidator(
attestation, subnet_id, checkSignature)
# Ensure node's own broadcast attestations end up in its attestation pool return case ok
discard node.processor.attestationValidator( of ValidationResult.Accept:
attestation, subnet_index, false) node.network.broadcast(
getAttestationTopic(node.forkDigest, subnet_id), attestation)
beacon_attestations_sent.inc() beacon_attestations_sent.inc()
true
else:
notice "Produced attestation failed validation",
attestation = shortLog(attestation),
result = $ok
false
proc sendVoluntaryExit*(node: BeaconNode, exit: SignedVoluntaryExit) = proc sendVoluntaryExit*(node: BeaconNode, exit: SignedVoluntaryExit) =
node.network.broadcast(getVoluntaryExitsTopic(node.forkDigest), exit) node.network.broadcast(getVoluntaryExitsTopic(node.forkDigest), exit)
@ -185,18 +192,21 @@ proc sendProposerSlashing*(node: BeaconNode, slashing: ProposerSlashing) =
node.network.broadcast(getProposerSlashingsTopic(node.forkDigest), node.network.broadcast(getProposerSlashingsTopic(node.forkDigest),
slashing) slashing)
proc sendAttestation*(node: BeaconNode, attestation: Attestation) = proc sendAttestation*(node: BeaconNode, attestation: Attestation): Future[bool] =
# For the validator API, which doesn't supply num_active_validators. # For the validator API, which doesn't supply the subnet id.
let attestationBlck = let attestationBlck =
node.chainDag.getRef(attestation.data.beacon_block_root) node.chainDag.getRef(attestation.data.beacon_block_root)
if attestationBlck.isNil: if attestationBlck.isNil:
debug "Attempt to send attestation without corresponding block" debug "Attempt to send attestation without corresponding block"
return return
let
epochRef = node.chainDag.getEpochRef(
attestationBlck, attestation.data.target.epoch)
subnet_id = compute_subnet_for_attestation(
get_committee_count_per_slot(epochRef), attestation.data.slot,
attestation.data.index.CommitteeIndex)
node.sendAttestation( node.sendAttestation(attestation, subnet_id, checkSignature = true)
attestation,
count_active_validators(
node.chainDag.getEpochRef(attestationBlck, attestation.data.target.epoch)))
proc createAndSendAttestation(node: BeaconNode, proc createAndSendAttestation(node: BeaconNode,
fork: Fork, fork: Fork,
@ -205,31 +215,41 @@ proc createAndSendAttestation(node: BeaconNode,
attestationData: AttestationData, attestationData: AttestationData,
committeeLen: int, committeeLen: int,
indexInCommittee: int, indexInCommittee: int,
num_active_validators: uint64) {.async.} = subnet_id: SubnetId) {.async.} =
var attestation = await validator.produceAndSignAttestation( try:
attestationData, committeeLen, indexInCommittee, fork, var
genesis_validators_root) attestation = await validator.produceAndSignAttestation(
attestationData, committeeLen, indexInCommittee, fork,
genesis_validators_root)
node.sendAttestation(attestation, num_active_validators) let ok = await node.sendAttestation(
attestation, subnet_id, checkSignature = false)
if not ok: # Logged in sendAttestation
return
if node.config.dumpEnabled: let sent = node.beaconClock.now()
dump(node.config.dumpDirOutgoing, attestation.data, validator.pubKey) if node.config.dumpEnabled:
dump(node.config.dumpDirOutgoing, attestation.data, validator.pubKey)
let wallTime = node.beaconClock.now() let wallTime = node.beaconClock.now()
let deadline = attestationData.slot.toBeaconTime() + let deadline = attestationData.slot.toBeaconTime() +
seconds(int(SECONDS_PER_SLOT div 3)) seconds(int(SECONDS_PER_SLOT div 3))
let (delayStr, delaySecs) = let (delayStr, delaySecs) =
if wallTime < deadline: if wallTime < deadline:
("-" & $(deadline - wallTime), -toFloatSeconds(deadline - wallTime)) ("-" & $(deadline - wallTime), -toFloatSeconds(deadline - wallTime))
else: else:
($(wallTime - deadline), toFloatSeconds(wallTime - deadline)) ($(wallTime - deadline), toFloatSeconds(wallTime - deadline))
notice "Attestation sent", attestation = shortLog(attestation), notice "Attestation sent", attestation = shortLog(attestation),
validator = shortLog(validator), delay = delayStr, validator = shortLog(validator), delay = delayStr,
indexInCommittee = indexInCommittee indexInCommittee = indexInCommittee
beacon_attestation_sent_delay.observe(delaySecs) beacon_attestation_sent_delay.observe(delaySecs)
except CatchableError as exc:
# An error could happen here when the signature task fails - we must
# not leak the exception because this is an asyncSpawn task
notice "Error sending attestation", err = exc.msg
proc getBlockProposalEth1Data*(node: BeaconNode, proc getBlockProposalEth1Data*(node: BeaconNode,
stateData: StateData): BlockProposalEth1Data = stateData: StateData): BlockProposalEth1Data =
@ -405,10 +425,6 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
attestationHeadRoot = shortLog(attestationHead.blck.root), attestationHeadRoot = shortLog(attestationHead.blck.root),
attestationSlot = shortLog(slot) attestationSlot = shortLog(slot)
var attestations: seq[tuple[
data: AttestationData, committeeLen, indexInCommittee: int,
validator: AttachedValidator, validator_index: ValidatorIndex]]
# We need to run attestations exactly for the slot that we're attesting to. # We need to run attestations exactly for the slot that we're attesting to.
# In case blocks went missing, this means advancing past the latest block # In case blocks went missing, this means advancing past the latest block
# using empty slots as fillers. # using empty slots as fillers.
@ -416,46 +432,42 @@ proc handleAttestations(node: BeaconNode, head: BlockRef, slot: Slot) =
let let
epochRef = node.chainDag.getEpochRef( epochRef = node.chainDag.getEpochRef(
attestationHead.blck, slot.compute_epoch_at_slot()) attestationHead.blck, slot.compute_epoch_at_slot())
committees_per_slot = committees_per_slot = get_committee_count_per_slot(epochRef)
get_committee_count_per_slot(epochRef)
num_active_validators = count_active_validators(epochRef)
fork = getStateField(node.chainDag.headState, fork) fork = getStateField(node.chainDag.headState, fork)
genesis_validators_root = genesis_validators_root =
getStateField(node.chainDag.headState, genesis_validators_root) getStateField(node.chainDag.headState, genesis_validators_root)
for committee_index in 0'u64..<committees_per_slot: for committee_index in get_committee_indices(epochRef):
let committee = get_beacon_committee( let committee = get_beacon_committee(epochRef, slot, committee_index)
epochRef, slot, committee_index.CommitteeIndex)
for index_in_committee, validator_index in committee: for index_in_committee, validator_index in committee:
let validator = node.getAttachedValidator(epochRef, validator_index) let validator = node.getAttachedValidator(epochRef, validator_index)
if validator != nil: if validator == nil:
let ad = makeAttestationData( continue
epochRef, attestationHead, committee_index.CommitteeIndex)
attestations.add(
(ad, committee.len, index_in_committee, validator, validator_index))
for a in attestations: let
# TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after data = makeAttestationData(epochRef, attestationHead, committee_index)
let signing_root = compute_attestation_root( # TODO signing_root is recomputed in produceAndSignAttestation/signAttestation just after
fork, genesis_validators_root, a.data) signing_root = compute_attestation_root(
let notSlashable = node.attachedValidators fork, genesis_validators_root, data)
.slashingProtection registered = node.attachedValidators
.registerAttestation( .slashingProtection
a.validator_index, .registerAttestation(
a.validator.pubkey, validator_index,
a.data.source.epoch, validator.pubkey,
a.data.target.epoch, data.source.epoch,
signing_root data.target.epoch,
) signing_root)
if notSlashable.isOk(): if registered.isOk():
traceAsyncErrors createAndSendAttestation( let subnet_id = compute_subnet_for_attestation(
node, fork, genesis_validators_root, a.validator, a.data, committees_per_slot, data.slot, data.index.CommitteeIndex)
a.committeeLen, a.indexInCommittee, num_active_validators) asyncSpawn createAndSendAttestation(
else: node, fork, genesis_validators_root, validator, data,
warn "Slashing protection activated for attestation", committee.len(), index_in_committee, subnet_id)
validator = a.validator.pubkey, else:
badVoteDetails = $notSlashable.error warn "Slashing protection activated for attestation",
validator = validator.pubkey,
badVoteDetails = $registered.error()
proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot): proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} = Future[BlockRef] {.async.} =

View File

@ -63,13 +63,9 @@ proc signWithRemoteValidator(v: AttachedValidator, data: Eth2Digest):
v.connection.inStream.flush() v.connection.inStream.flush()
var line = newStringOfCap(120).TaintedString var line = newStringOfCap(120).TaintedString
discard v.connection.outStream.readLine(line) discard v.connection.outStream.readLine(line)
result = ValidatorSig.fromHex(line).get() return ValidatorSig.fromHex(line).get()
# TODO this is an ugly hack to fake a delay and subsequent async reordering
# for the purpose of testing the external validator delay - to be
# replaced by something more sensible
await sleepAsync(chronos.milliseconds(1))
# TODO: Honest validator - https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md
proc signBlockProposal*(v: AttachedValidator, fork: Fork, proc signBlockProposal*(v: AttachedValidator, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot, genesis_validators_root: Eth2Digest, slot: Slot,
blockRoot: Eth2Digest): Future[ValidatorSig] {.async.} = blockRoot: Eth2Digest): Future[ValidatorSig] {.async.} =
@ -81,14 +77,14 @@ proc signBlockProposal*(v: AttachedValidator, fork: Fork,
await signWithRemoteValidator(v, root) await signWithRemoteValidator(v, root)
proc signAttestation*(v: AttachedValidator, proc signAttestation*(v: AttachedValidator,
attestation: AttestationData, data: AttestationData,
fork: Fork, genesis_validators_root: Eth2Digest): fork: Fork, genesis_validators_root: Eth2Digest):
Future[ValidatorSig] {.async.} = Future[ValidatorSig] {.async.} =
return if v.kind == inProcess: return if v.kind == inProcess:
get_attestation_signature( get_attestation_signature(
fork, genesis_validators_root, attestation, v.privKey).toValidatorSig() fork, genesis_validators_root, data, v.privKey).toValidatorSig()
else: else:
let root = compute_attestation_root(fork, genesis_validators_root, attestation) let root = compute_attestation_root(fork, genesis_validators_root, data)
await signWithRemoteValidator(v, root) await signWithRemoteValidator(v, root)
proc produceAndSignAttestation*(validator: AttachedValidator, proc produceAndSignAttestation*(validator: AttachedValidator,

View File

@ -194,7 +194,7 @@ func getTopics(forkDigest: ForkDigest,
of TopicFilter.Attestations: of TopicFilter.Attestations:
mapIt( mapIt(
0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64, 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64,
getAttestationTopic(forkDigest, it) & "_snappy") getAttestationTopic(forkDigest, SubnetId(it)) & "_snappy")
proc loadBootFile(name: string): seq[string] = proc loadBootFile(name: string): seq[string] =
try: try:

View File

@ -7,7 +7,45 @@ import
./testutil ./testutil
suite "Bit fields": suite "Bit fields":
test "roundtrips": test "roundtrips BitArray":
var
a = BitArray[100]()
b = BitArray[100]()
check:
not a[0]
a.setBit 1
check:
not a[0]
a[1]
a + b == a
a - b == a
b + a == a
b - a == b # b is empty
b.setBit 2
check:
(a + b)[2]
(b - a)[2]
not (b - a)[1]
a.incl(b)
check:
not a[0]
a[1]
a[2]
a.clear()
check:
not a[1]
test "roundtrips BitSeq":
var var
a = BitSeq.init(100) a = BitSeq.init(100)
b = BitSeq.init(100) b = BitSeq.init(100)

View File

@ -92,13 +92,14 @@ suite "Gossip validation " & preset():
pool[].nextAttestationEpoch.setLen(0) # reset for test pool[].nextAttestationEpoch.setLen(0) # reset for test
check: check:
# Wrong subnet # Wrong subnet
validateAttestation(pool, batchCrypto, att_1_0, beaconTime, subnet + 1, true).waitFor().isErr validateAttestation(
pool, batchCrypto, att_1_0, beaconTime, SubnetId(subnet.uint8 + 1), true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test pool[].nextAttestationEpoch.setLen(0) # reset for test
check: check:
# Too far in the future # Too far in the future
validateAttestation( validateAttestation(
pool, batchCrypto, att_1_0, beaconTime - 1.seconds, subnet + 1, true).waitFor().isErr pool, batchCrypto, att_1_0, beaconTime - 1.seconds, subnet, true).waitFor().isErr
pool[].nextAttestationEpoch.setLen(0) # reset for test pool[].nextAttestationEpoch.setLen(0) # reset for test
check: check:
@ -106,7 +107,7 @@ suite "Gossip validation " & preset():
validateAttestation( validateAttestation(
pool, batchCrypto, att_1_0, pool, batchCrypto, att_1_0,
beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds, beaconTime - (SECONDS_PER_SLOT * SLOTS_PER_EPOCH - 1).int.seconds,
subnet + 1, true).waitFor().isErr subnet, true).waitFor().isErr
block: block:
var broken = att_1_0 var broken = att_1_0

View File

@ -19,45 +19,45 @@ suite "Honest validator":
test "Mainnet attestation topics": test "Mainnet attestation topics":
check: check:
getAttestationTopic(forkDigest, 0) == getAttestationTopic(forkDigest, SubnetId(0)) ==
"/eth2/00000000/beacon_attestation_0/ssz" "/eth2/00000000/beacon_attestation_0/ssz"
getAttestationTopic(forkDigest, 5) == getAttestationTopic(forkDigest, SubnetId(5)) ==
"/eth2/00000000/beacon_attestation_5/ssz" "/eth2/00000000/beacon_attestation_5/ssz"
getAttestationTopic(forkDigest, 7) == getAttestationTopic(forkDigest, SubnetId(7)) ==
"/eth2/00000000/beacon_attestation_7/ssz" "/eth2/00000000/beacon_attestation_7/ssz"
getAttestationTopic(forkDigest, 9) == getAttestationTopic(forkDigest, SubnetId(9)) ==
"/eth2/00000000/beacon_attestation_9/ssz" "/eth2/00000000/beacon_attestation_9/ssz"
getAttestationTopic(forkDigest, 13) == getAttestationTopic(forkDigest, SubnetId(13)) ==
"/eth2/00000000/beacon_attestation_13/ssz" "/eth2/00000000/beacon_attestation_13/ssz"
getAttestationTopic(forkDigest, 19) == getAttestationTopic(forkDigest, SubnetId(19)) ==
"/eth2/00000000/beacon_attestation_19/ssz" "/eth2/00000000/beacon_attestation_19/ssz"
getAttestationTopic(forkDigest, 20) == getAttestationTopic(forkDigest, SubnetId(20)) ==
"/eth2/00000000/beacon_attestation_20/ssz" "/eth2/00000000/beacon_attestation_20/ssz"
getAttestationTopic(forkDigest, 22) == getAttestationTopic(forkDigest, SubnetId(22)) ==
"/eth2/00000000/beacon_attestation_22/ssz" "/eth2/00000000/beacon_attestation_22/ssz"
getAttestationTopic(forkDigest, 25) == getAttestationTopic(forkDigest, SubnetId(25)) ==
"/eth2/00000000/beacon_attestation_25/ssz" "/eth2/00000000/beacon_attestation_25/ssz"
getAttestationTopic(forkDigest, 27) == getAttestationTopic(forkDigest, SubnetId(27)) ==
"/eth2/00000000/beacon_attestation_27/ssz" "/eth2/00000000/beacon_attestation_27/ssz"
getAttestationTopic(forkDigest, 31) == getAttestationTopic(forkDigest, SubnetId(31)) ==
"/eth2/00000000/beacon_attestation_31/ssz" "/eth2/00000000/beacon_attestation_31/ssz"
getAttestationTopic(forkDigest, 39) == getAttestationTopic(forkDigest, SubnetId(39)) ==
"/eth2/00000000/beacon_attestation_39/ssz" "/eth2/00000000/beacon_attestation_39/ssz"
getAttestationTopic(forkDigest, 45) == getAttestationTopic(forkDigest, SubnetId(45)) ==
"/eth2/00000000/beacon_attestation_45/ssz" "/eth2/00000000/beacon_attestation_45/ssz"
getAttestationTopic(forkDigest, 47) == getAttestationTopic(forkDigest, SubnetId(47)) ==
"/eth2/00000000/beacon_attestation_47/ssz" "/eth2/00000000/beacon_attestation_47/ssz"
getAttestationTopic(forkDigest, 48) == getAttestationTopic(forkDigest, SubnetId(48)) ==
"/eth2/00000000/beacon_attestation_48/ssz" "/eth2/00000000/beacon_attestation_48/ssz"
getAttestationTopic(forkDigest, 50) == getAttestationTopic(forkDigest, SubnetId(50)) ==
"/eth2/00000000/beacon_attestation_50/ssz" "/eth2/00000000/beacon_attestation_50/ssz"
getAttestationTopic(forkDigest, 53) == getAttestationTopic(forkDigest, SubnetId(53)) ==
"/eth2/00000000/beacon_attestation_53/ssz" "/eth2/00000000/beacon_attestation_53/ssz"
getAttestationTopic(forkDigest, 54) == getAttestationTopic(forkDigest, SubnetId(54)) ==
"/eth2/00000000/beacon_attestation_54/ssz" "/eth2/00000000/beacon_attestation_54/ssz"
getAttestationTopic(forkDigest, 62) == getAttestationTopic(forkDigest, SubnetId(62)) ==
"/eth2/00000000/beacon_attestation_62/ssz" "/eth2/00000000/beacon_attestation_62/ssz"
getAttestationTopic(forkDigest, 63) == getAttestationTopic(forkDigest, SubnetId(63)) ==
"/eth2/00000000/beacon_attestation_63/ssz" "/eth2/00000000/beacon_attestation_63/ssz"
test "is_aggregator": test "is_aggregator":