The peer cycling takes into accounts the syncnets ENR fields

This commit is contained in:
Zahary Karadjov 2021-09-22 01:25:49 +03:00 committed by zah
parent 855d0257ff
commit 02372849f1
2 changed files with 76 additions and 43 deletions

View File

@ -899,9 +899,11 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] =
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr)
proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID,
attnets: BitArray[ATTESTATION_SUBNET_COUNT]):
Future[seq[Node]] {.async.} =
proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
wantedAttnets: BitArray[ATTESTATION_SUBNET_COUNT],
wantedSyncnets: BitArray[SYNC_COMMITTEE_SUBNET_COUNT]): Future[seq[Node]] {.async.} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
@ -910,24 +912,38 @@ proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID,
var filtered: seq[(int, Node)]
for n in nodes:
let res = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
var score: int = 0
if res.isSome():
let attnetsBytes = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
if attnetsBytes.isSome():
let attnetsNode =
try:
SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT])
SSZ.decode(attnetsBytes.get(), BitArray[ATTESTATION_SUBNET_COUNT])
except SszError as e:
debug "Could not decode attestation subnet bitfield of peer",
debug "Could not decode the attnets ERN bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
var score: int = 0
for i in 0..<ATTESTATION_SUBNET_COUNT:
if attnets[i] and attnetsNode[i]:
inc score
if wantedAttnets[i] and attnetsNode[i]:
score += 1
if score > 0:
filtered.add((score, n))
let syncnetsBytes = n.record.tryGet(enrSyncSubnetsField, seq[byte])
if syncnetsBytes.isSome():
let syncnetsNode =
try:
SSZ.decode(syncnetsBytes.get(), BitArray[SYNC_COMMITTEE_SUBNET_COUNT])
except SszError as e:
debug "Could not decode the syncnets ENR bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
for i in 0..<SYNC_COMMITTEE_SUBNET_COUNT:
if wantedSyncnets[i] and syncnetsNode[i]:
score += 10 # connecting to the right syncnet is urgent
if score > 0:
filtered.add((score, n))
d.rng[].shuffle(filtered)
return filtered.sortedByIt(it[0]).mapIt(it[1])
@ -990,54 +1006,64 @@ proc trimConnections(node: Eth2Node, count: int) {.async.} =
dec toKick
if toKick <= 0: return
proc getLowAttnets(node: Eth2Node): BitArray[ATTESTATION_SUBNET_COUNT] =
proc getLowSubnets(node: Eth2Node):
(BitArray[ATTESTATION_SUBNET_COUNT],
BitArray[SYNC_COMMITTEE_SUBNET_COUNT]) =
# Returns the subnets required to have a healthy mesh
# The subnets are computed, to, in order:
# - Have 0 subscribed subnet below `dLow`
# - Have 0 subnet with < `d` peers from topic subscription
# - Have 0 subscribed subnet below `dOut` outgoing peers
var
lowOutgoingSubnets: BitArray[ATTESTATION_SUBNET_COUNT]
belowDLowSubnets: BitArray[ATTESTATION_SUBNET_COUNT]
belowDOutSubnets: BitArray[ATTESTATION_SUBNET_COUNT]
template findLowSubnets(topicNameGenerator: untyped,
SubnetIdType: type,
totalSubnets: static int): auto =
var
lowOutgoingSubnets: BitArray[totalSubnets]
belowDLowSubnets: BitArray[totalSubnets]
belowDOutSubnets: BitArray[totalSubnets]
for subNetId in 0..<ATTESTATION_SUBNET_COUNT:
let topic =
getAttestationTopic(node.forkId.forkDigest, SubnetId(subNetId)) & "_snappy"
for subNetId in 0 ..< totalSubnets:
let topic =
topicNameGenerator(node.forkId.forkDigest, SubnetIdType(subNetId)) & "_snappy"
if node.pubsub.gossipsub.peers(topic) < node.pubsub.parameters.d:
lowOutgoingSubnets.setBit(subNetId)
if node.pubsub.gossipsub.peers(topic) < node.pubsub.parameters.d:
lowOutgoingSubnets.setBit(subNetId)
# Not subscribed
if topic notin node.pubsub.mesh: continue
# Not subscribed
if topic notin node.pubsub.mesh: continue
if node.pubsub.mesh.peers(topic) < node.pubsub.parameters.dLow:
belowDlowSubnets.setBit(subNetId)
if node.pubsub.mesh.peers(topic) < node.pubsub.parameters.dLow:
belowDlowSubnets.setBit(subNetId)
let outPeers = node.pubsub.mesh.getOrDefault(topic).toSeq().filterIt(it.outbound)
if outPeers.len() < node.pubsub.parameters.dOut:
belowDOutSubnets.setBit(subNetId)
let outPeers = node.pubsub.mesh.getOrDefault(topic).toSeq().filterIt(it.outbound)
if outPeers.len() < node.pubsub.parameters.dOut:
belowDOutSubnets.setBit(subNetId)
if belowDLowSubnets.countOnes() > 0:
return belowDLowSubnets
if lowOutgoingSubnets.countOnes() > 0:
return lowOutgoingSubnets
return belowDOutSubnets
if belowDLowSubnets.countOnes() > 0:
belowDLowSubnets
elif lowOutgoingSubnets.countOnes() > 0:
lowOutgoingSubnets
else:
belowDOutSubnets
return (
findLowSubnets(getAttestationTopic, SubnetId, ATTESTATION_SUBNET_COUNT),
findLowSubnets(getSyncCommitteeTopic, SyncCommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT)
)
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
debug "Starting discovery loop"
while true:
let
wantedAttnets = node.getLowAttnets()
(wantedAttnets, wantedSyncnets) = node.getLowSubnets()
wantedAttnetsCount = wantedAttnets.countOnes()
wantedSyncnetsCount = wantedSyncnets.countOnes()
if wantedAttnetsCount > 0:
let discoveredNodes = await node.discovery.queryRandom(node.forkId, wantedAttnets)
if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0:
let discoveredNodes = await node.discovery.queryRandom(
node.forkId, wantedAttnets, wantedSyncnets)
var newPeers = 0
for discNode in discoveredNodes:

View File

@ -30,6 +30,9 @@ proc generateNode(rng: ref BrHmacDrbgContext, port: Port,
Eth2DiscoveryProtocol.new(keys.PrivateKey.random(rng[]),
some(ip), some(port), some(port), port, ip, enrFields, rng = rng)
# TODO: Add tests with a syncnets preference
const noSyncnetsPreference = BitArray[SYNC_COMMITTEE_SUBNET_COUNT]()
suite "Eth2 specific discovery tests":
let
rng = keys.newRng()
@ -59,7 +62,8 @@ suite "Eth2 specific discovery tests":
attnetsSelected.setBit(42)
attnetsSelected.setBit(34)
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
check discovered.len == 1
await node1.closeWait()
@ -96,7 +100,8 @@ suite "Eth2 specific discovery tests":
attnetsSelected.setBit(15)
attnetsSelected.setBit(42)
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
check discovered.len == 1
await node1.closeWait()
@ -123,7 +128,8 @@ suite "Eth2 specific discovery tests":
attnetsSelected.setBit(2)
block:
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
check discovered.len == 0
block:
@ -137,7 +143,8 @@ suite "Eth2 specific discovery tests":
check nodes.isOk() and nodes[].len > 0
discard node1.addNode(nodes[][0])
let discovered = await node1.queryRandom(enrForkId, attnetsSelected)
let discovered = await node1.queryRandom(
enrForkId, attnetsSelected, noSyncnetsPreference)
check discovered.len == 1
await node1.closeWait()