fix subnet logic (#2555)

This PR decreases the lead subscription time which should help
decrease bandwidth usage and CPU making the subscription for future
aggregation happen a bit later. There's room for more tuning here,
probably.

* fix missing negation from in #2550
* fix silly bitarray issues
* decrease subnet lead subscription time
* log all subnet switching source data
* rename subnet trackers to refer to stability and aggregate subnets
* more tests
This commit is contained in:
Jacek Sieka 2021-05-11 22:03:40 +02:00 committed by GitHub
parent 149ff49c8e
commit 0a477eb2d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 53 deletions

View File

@ -135,11 +135,15 @@ type
AttestationSubnets* = object
enabled*: bool
stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]]
stabilitySubnets*: seq[tuple[subnet_id: SubnetId, expiration: Epoch]] ##\
## The subnets on which we listen and broadcast gossip traffic to maintain
## the health of the network - these are advertised in the ENR
nextCycleEpoch*: Epoch
# These encode states in per-subnet state machines
subscribedSubnets*: BitArray[ATTESTATION_SUBNET_COUNT]
aggregateSubnets*: BitArray[ATTESTATION_SUBNET_COUNT] ##\
## The subnets on which we listen for attestations in order to produce
## aggregates
subscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]
unsubscribeSlot*: array[ATTESTATION_SUBNET_COUNT, Slot]

View File

@ -426,7 +426,7 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
# finalization occurs every slot, to 4 slots vs scheduledSlot.
doAssert finalizedEpoch + 4 >= epoch
func getStabilitySubnets(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] =
func toBitArray(stabilitySubnets: auto): BitArray[ATTESTATION_SUBNET_COUNT] =
for subnetInfo in stabilitySubnets:
result[subnetInfo.subnet_id.int] = true
@ -507,8 +507,13 @@ proc updateSubscriptionSchedule(node: BeaconNode, epoch: Epoch) {.async.} =
node.attestationSubnets.unsubscribeSlot[subnet_id.uint64] =
max(slot + 1, node.attestationSubnets.unsubscribeSlot[subnet_id.uint64])
if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]:
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 34
if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]:
# The lead time here allows for the gossip mesh to stabilise well before
# attestations start flowing on the channel - the downside of a long lead
# time is that we waste bandwidth and CPU on traffic we're not strictly
# interested in - it could potentially be decreased, specially when peers
# are selected based on their stability subnet connectivity
const SUBNET_SUBSCRIPTION_LEAD_TIME_SLOTS = 6
node.attestationSubnets.subscribeSlot[subnet_id.uint64] =
# Queue upcoming subscription potentially earlier
@ -572,35 +577,37 @@ proc cycleAttestationSubnetsPerEpoch(
return stabilitySubnets
func subnetLog(v: BitArray): string =
$toSeq(v.oneIndices())
proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
static: doAssert RANDOM_SUBNETS_PER_VALIDATOR == 1
doAssert not node.config.subscribeAllSubnets
let prevSubscribedSubnets = node.attestationSubnets.subscribedSubnets
for i in 0..<node.attestationSubnets.subscribedSubnets.len():
if node.attestationSubnets.subscribedSubnets[i]:
if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]:
node.attestationSubnets.subscribedSubnets[i] = false
else:
if wallSlot >= node.attestationSubnets.subscribeSlot[i]:
node.attestationSubnets.subscribedSubnets[i] = true
let
prevStabilitySubnets =
node.attestationSubnets.stabilitySubnets.getStabilitySubnets()
node.attestationSubnets.stabilitySubnets.toBitArray()
stabilitySubnets =
await node.cycleAttestationSubnetsPerEpoch(wallSlot, prevStabilitySubnets)
let prevAggregateSubnets = node.attestationSubnets.aggregateSubnets
for i in 0..<node.attestationSubnets.aggregateSubnets.len():
if node.attestationSubnets.aggregateSubnets[i]:
if wallSlot >= node.attestationSubnets.unsubscribeSlot[i]:
node.attestationSubnets.aggregateSubnets[i] = false
else:
if wallSlot >= node.attestationSubnets.subscribeSlot[i]:
node.attestationSubnets.aggregateSubnets[i] = true
# Accounting specific to non-stability subnets
for i, enabled in
(prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets):
if enabled:
for i in (prevAggregateSubnets - node.attestationSubnets.aggregateSubnets).
oneIndices():
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
let
prevAllSubnets = prevSubscribedSubnets + prevStabilitySubnets
allSubnets = node.attestationSubnets.subscribedSubnets + stabilitySubnets
prevAllSubnets = prevAggregateSubnets + prevStabilitySubnets
allSubnets = node.attestationSubnets.aggregateSubnets + stabilitySubnets
unsubscribeSubnets = prevAllSubnets - allSubnets
subscribeSubnets = allSubnets - prevAllSubnets
@ -608,25 +615,21 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
node.network.subscribeAttestationSubnets(subscribeSubnets)
debug "Attestation subnets",
expiringSubnets =
prevSubscribedSubnets - node.attestationSubnets.subscribedSubnets,
subnets = node.attestationSubnets.subscribedSubnets,
newSubnets =
node.attestationSubnets.subscribedSubnets - prevSubscribedSubnets,
wallSlot,
wallEpoch = wallSlot.epoch,
num_stability_subnets = node.attestationSubnets.stabilitySubnets.len,
expiring_stability_subnets = prevStabilitySubnets - stabilitySubnets,
new_stability_subnets = stabilitySubnets - prevStabilitySubnets,
subscribeSubnets,
unsubscribeSubnets
prevAggregateSubnets = subnetLog(prevAggregateSubnets),
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
prevStabilitySubnets = subnetLog(prevStabilitySubnets),
stabilitySubnets = subnetLog(stabilitySubnets),
subscribeSubnets = subnetLog(subscribeSubnets),
unsubscribeSubnets = subnetLog(unsubscribeSubnets)
proc getInitialAttestationSubnets(node: BeaconNode): Table[SubnetId, Slot] =
proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] =
let
wallEpoch = node.beaconClock.now().slotOrZero().epoch
validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys()))
template mergeAttestationSubnets(epoch: Epoch) =
template mergeAggregateSubnets(epoch: Epoch) =
# TODO when https://github.com/nim-lang/Nim/issues/15972 and
# https://github.com/nim-lang/Nim/issues/16217 are fixed, in
# Nimbus's Nim, use (_, _, subnetIndex, slot).
@ -643,8 +646,8 @@ proc getInitialAttestationSubnets(node: BeaconNode): Table[SubnetId, Slot] =
# epoch 0 for real, in which case both are also already known; or wallEpoch
# is greater than 0, in which case it's being called from onSlotStart which
# has enough state to calculate wallEpoch + {0,1}'s attestations.
mergeAttestationSubnets(wallEpoch)
mergeAttestationSubnets(wallEpoch + 1)
mergeAggregateSubnets(wallEpoch)
mergeAggregateSubnets(wallEpoch + 1)
proc subscribeAttestationSubnetHandlers(node: BeaconNode) {.
raises: [Defect, CatchableError].} =
@ -673,28 +676,28 @@ proc subscribeAttestationSubnetHandlers(node: BeaconNode) {.
ss.subnet_id = node.network.getRandomSubnetId()
ss.expiration = wallEpoch + node.network.getStabilitySubnetLength()
let initialStabilitySubnets =
node.attestationSubnets.stabilitySubnets.getStabilitySubnets()
node.network.updateStabilitySubnetMetadata(initialStabilitySubnets)
let stabilitySubnets =
node.attestationSubnets.stabilitySubnets.toBitArray()
node.network.updateStabilitySubnetMetadata(stabilitySubnets)
let
initialSubnets = node.getInitialAttestationSubnets()
aggregateSubnets = node.getInitialAggregateSubnets()
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
if SubnetId(i) in initialSubnets:
node.attestationSubnets.subscribedSubnets[i] = true
if SubnetId(i) in aggregateSubnets:
node.attestationSubnets.aggregateSubnets[i] = true
node.attestationSubnets.unsubscribeSlot[i] =
try: initialSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in"
try: aggregateSubnets[SubnetId(i)] except KeyError: raiseAssert "checked with in"
else:
node.attestationSubnets.subscribedSubnets[i] = false
node.attestationSubnets.aggregateSubnets[i] = false
node.attestationSubnets.subscribeSlot[i] = FAR_FUTURE_SLOT
node.attestationSubnets.enabled = true
debug "Initial attestation subnets subscribed",
initialSubnets,
initialStabilitySubnets
aggregateSubnets = subnetLog(node.attestationSubnets.aggregateSubnets),
stabilitySubnets = subnetLog(stabilitySubnets)
node.network.subscribeAttestationSubnets(
node.attestationSubnets.subscribedSubnets + initialStabilitySubnets)
node.attestationSubnets.aggregateSubnets + stabilitySubnets)
proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} =
# inspired by lighthouse research here

View File

@ -158,7 +158,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
# ahead one can check for attestation schedule is that it might be used
# for up to the end of next epoch. Therefore, arrange for subscriptions
# to last at least that long.
if node.attestationSubnets.subscribedSubnets[subnet_id.uint64]:
if not node.attestationSubnets.aggregateSubnets[subnet_id.uint64]:
# When to subscribe. Since it's not clear when from the API it's first
# needed, do so immediately.
node.attestationSubnets.subscribeSlot[subnet_id.uint64] =

View File

@ -228,7 +228,7 @@ template cmp*(a, b: BitSeq): int =
template `==`*(a, b: BitSeq): bool =
cmp(a, b) == 0
func `$`*(a: BitSeq): string =
func `$`*(a: BitSeq | BitArray): string =
let length = a.len
result = newStringOfCap(2 + length)
result.add "0b"
@ -301,9 +301,13 @@ func clear*(a: var BitArray) =
# Set operations
func `+`*(a, b: BitArray): BitArray =
for i in 0..<a.bytes.len:
result[i] = a[i] or b[i]
result.bytes[i] = a.bytes[i] or b.bytes[i]
func `-`*(a, b: BitArray): BitArray =
for i in 0..<a.bytes.len:
result[i] = a[i] and (not b[i])
result.bytes[i] = a.bytes[i] and (not b.bytes[i])
iterator oneIndices*(a: BitArray): int =
for i in 0..<a.len:
if a[i]: yield i

View File

@ -2,7 +2,7 @@
import
unittest2,
strformat,
std/[sequtils, strformat],
../beacon_chain/ssz/bitseqs,
./testutil
@ -11,6 +11,7 @@ suite "Bit fields":
var
a = BitArray[100]()
b = BitArray[100]()
c = BitArray[100]()
check:
not a[0]
@ -20,12 +21,14 @@ suite "Bit fields":
check:
not a[0]
a[1]
toSeq(a.oneIndices()) == [1]
a + b == a
a - b == a
a - a == c # empty
b + a == a
b - a == b # b is empty
b - b == c # b is empty
b.setBit 2
@ -34,6 +37,15 @@ suite "Bit fields":
(b - a)[2]
not (b - a)[1]
a.setBit 99
check:
(a + b)[99]
(b - a)[2]
not (b - a)[1]
not (b - a)[99]
toSeq(a.oneIndices()) == [1, 99]
a.incl(b)
check: