diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index dc506de4b..433f60e74 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -135,11 +135,15 @@ type AttestationSubnets* = object enabled*: bool - stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]] + stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]] ##\ + ## The subnets on which we listen and broadcast gossip traffic to maintain + ## the health of the network - these are advertised in the ENR nextCycleEpoch*: Epoch # These encode states in per-subnet state machines - subscribedSubnets*: BitArray[ATTESTATION_SUBNET_COUNT] + aggregateSubnets*: BitArray[ATTESTATION_SUBNET_COUNT] ##\ + ## The subnets on which we listen for attestations in order to produce + ## aggregates subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot] diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index aca28b50c..c00de56e9 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -426,7 +426,7 @@ func verifyFinalization(node: BeaconNode, slot: Slot) = # finalization occurs every slot, to 4 slots vs scheduledSlot. doAssert finalizedEpoch + 4 >= epoch -func getStabilitySubnets(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] = +func toBitArray(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] = for subnetInfo in stabilitySubnets: result[subnetInfo.subnet_id.int] = true @@ -507,8 +507,13 @@ proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} = node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] = max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnet_id.uint64]) - if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]: - const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34 + if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]: + # The lead time here allows for the gossip mesh to stabilise well before + # attestations start flowing on the channel - the downside of a long lead + # time is that we waste bandwidth and CPU on traffic we're not strictly + # interested in - it could potentially be decreased, specially when peers + # are selected based on their stability subnet connectivity + const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 6 node.attestationSubnets.subscribeSlot[subnet_id.uint64] = # Queue upcoming subscription potentially earlier @@ -572,35 +577,37 @@ proc cycleAttestationSubnetsPerEpoch( return stabilitySubnets +func subnetLog(v: BitArray): string = + $toSeq(v.oneIndices()) + proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1 doAssert not node.config.subscribeAllSubnets - let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets - - for i in 0..= node.attestationSubnets.unsubscribeSlot[i]: - node.attestationSubnets.subscribedSubnets[i] = false - else: - if wallSlot >= node.attestationSubnets.subscribeSlot[i]: - node.attestationSubnets.subscribedSubnets[i] = true - let prevStabilitySubnets = - node.attestationSubnets.stabilitySubnets.getStabilitySubnets() + node.attestationSubnets.stabilitySubnets.toBitArray() stabilitySubnets = await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets) + let prevAggregateSubnets = node.attestationSubnets.aggregateSubnets + + for i in 0..= node.attestationSubnets.unsubscribeSlot[i]: + node.attestationSubnets.aggregateSubnets[i] = false + else: + if wallSlot >= node.attestationSubnets.subscribeSlot[i]: + node.attestationSubnets.aggregateSubnets[i] = true + # Accounting specific to non-stability subnets - for i, enabled in - (prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets): - if enabled: - node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT + for i in (prevAggregateSubnets - node.attestationSubnets.aggregateSubnets). + oneIndices(): + node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT let - prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets - allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets + prevAllSubnets = prevAggregateSubnets + prevStabilitySubnets + allSubnets = node.attestationSubnets.aggregateSubnets + stabilitySubnets unsubscribeSubnets = prevAllSubnets - allSubnets subscribeSubnets = allSubnets - prevAllSubnets @@ -608,25 +615,21 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = node.network.subscribeAttestationSubnets(subscribeSubnets) debug "Attestation subnets", - expiringSubnets = - prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets, - subnets = node.attestationSubnets.subscribedSubnets, - newSubnets = - node.attestationSubnets.subscribedSubnets - prevSubscribedSubnets, wallSlot, wallEpoch = wallSlot.epoch, - num_stability_subnets = node.attestationSubnets.stabilitySubnets.len, - expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets, - new_stability_subnets = stabilitySubnets - prevStabilitySubnets, - subscribeSubnets, - unsubscribeSubnets + prevAggregateSubnets = subnetLog(prevAggregateSubnets), + aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets), + prevStabilitySubnets = subnetLog(prevStabilitySubnets), + stabilitySubnets = subnetLog(stabilitySubnets), + subscribeSubnets = subnetLog(subscribeSubnets), + unsubscribeSubnets = subnetLog(unsubscribeSubnets) -proc getInitialAttestationSubnets(node: BeaconNode): Table[SubnetId, Slot] = +proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] = let wallEpoch = node.beaconClock.now().slotOrZero().epoch validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys())) - template mergeAttestationSubnets(epoch: Epoch) = + template mergeAggregateSubnets(epoch: Epoch) = # TODO when https://github.com/nim-lang/Nim/issues/15972 and # https://github.com/nim-lang/Nim/issues/16217 are fixed, in # Nimbus's Nim, use (_, _, subnetIndex, slot). @@ -643,8 +646,8 @@ proc getInitialAttestationSubnets(node: BeaconNode): Table[SubnetId, Slot] = # epoch 0 for real, in which case both are also already known; or wallEpoch # is greater than 0, in which case it's being called from onSlotStart which # has enough state to calculate wallEpoch + {0,1}'s attestations. - mergeAttestationSubnets(wallEpoch) - mergeAttestationSubnets(wallEpoch + 1) + mergeAggregateSubnets(wallEpoch) + mergeAggregateSubnets(wallEpoch + 1) proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. raises: [Defect, CatchableError].} = @@ -673,28 +676,28 @@ proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. ss.subnet_id = node.network.getRandomSubnetId() ss.expiration = wallEpoch + node.network.getStabilitySubnetLength() - let initialStabilitySubnets = - node.attestationSubnets.stabilitySubnets.getStabilitySubnets() - node.network.updateStabilitySubnetMetadata(initialStabilitySubnets) + let stabilitySubnets = + node.attestationSubnets.stabilitySubnets.toBitArray() + node.network.updateStabilitySubnetMetadata(stabilitySubnets) let - initialSubnets = node.getInitialAttestationSubnets() + aggregateSubnets = node.getInitialAggregateSubnets() for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT: - if SubnetId(i) in initialSubnets: - node.attestationSubnets.subscribedSubnets[i] = true + if SubnetId(i) in aggregateSubnets: + node.attestationSubnets.aggregateSubnets[i] = true node.attestationSubnets.unsubscribeSlot[i] = - try: initialSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in" + try: aggregateSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in" else: - node.attestationSubnets.subscribedSubnets[i] = false + node.attestationSubnets.aggregateSubnets[i] = false node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT node.attestationSubnets.enabled = true debug "Initial attestation subnets subscribed", - initialSubnets, - initialStabilitySubnets + aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets), + stabilitySubnets = subnetLog(stabilitySubnets) node.network.subscribeAttestationSubnets( - node.attestationSubnets.subscribedSubnets + initialStabilitySubnets) + node.attestationSubnets.aggregateSubnets + stabilitySubnets) proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = # inspired by lighthouse research here diff --git a/beacon_chain/rpc/validator_api.nim b/beacon_chain/rpc/validator_api.nim index 8194008ee..ed901060d 100644 --- a/beacon_chain/rpc/validator_api.nim +++ b/beacon_chain/rpc/validator_api.nim @@ -158,7 +158,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. # ahead one can check for attestation schedule is that it might be used # for up to the end of next epoch. Therefore, arrange for subscriptions # to last at least that long. - if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]: + if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]: # When to subscribe. Since it's not clear when from the API it's first # needed, do so immediately. node.attestationSubnets.subscribeSlot[subnet_id.uint64] = diff --git a/beacon_chain/ssz/bitseqs.nim b/beacon_chain/ssz/bitseqs.nim index 823c003d8..13db25d1a 100644 --- a/beacon_chain/ssz/bitseqs.nim +++ b/beacon_chain/ssz/bitseqs.nim @@ -228,7 +228,7 @@ template cmp*(a, b: BitSeq): int = template `==`*(a, b: BitSeq): bool = cmp(a, b) == 0 -func `$`*(a: BitSeq): string = +func `$`*(a: BitSeq | BitArray): string = let length = a.len result = newStringOfCap(2 + length) result.add "0b" @@ -301,9 +301,13 @@ func clear*(a: var BitArray) = # Set operations func `+`*(a, b: BitArray): BitArray = for i in 0..