Peer cycling (#2668)
* start of peer cycling * discover new peers when low coverage * disconnect unresponsive peers * cleaning up a bit * smarter cycling * remove lowpeer event * use sync_protocol to get metadata * moved cycling to discovery * fix discovery for tests * change test * various tweaks * Faster discovery * start of big refacto * removed peer_balancer * updated test * added lazydial * allow at least 10 incoming conn * popcount -> countOnes * allow max_conn - outgoingconn incoming connections * add dOut check * Removed lazyDial option and better kick * use attnets in scoring * only score peer with metadata * cleanup * use topic subscription * rework dialing filtering, fix metadata compat issue
This commit is contained in:
parent
79b2539ef0
commit
d9f7ba7153
|
@ -13,7 +13,7 @@ import
|
|||
std/options as stdOptions,
|
||||
|
||||
# Status libs
|
||||
stew/[leb128, base58, endians2, results, byteutils, io2], bearssl,
|
||||
stew/[leb128, base58, endians2, results, byteutils, io2, bitops2], bearssl,
|
||||
stew/shims/net as stewNet,
|
||||
stew/shims/[macros, tables],
|
||||
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
|
||||
|
@ -26,7 +26,7 @@ import
|
|||
libp2p/muxers/muxer, libp2p/muxers/mplex/mplex,
|
||||
libp2p/transports/[transport, tcptransport],
|
||||
libp2p/protocols/secure/[secure, noise],
|
||||
libp2p/protocols/pubsub/[pubsub, gossipsub, rpc/message, rpc/messages],
|
||||
libp2p/protocols/pubsub/[pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer],
|
||||
libp2p/transports/tcptransport,
|
||||
libp2p/stream/connection,
|
||||
libp2p/utils/semaphore,
|
||||
|
@ -85,6 +85,7 @@ type
|
|||
rng*: ref BrHmacDrbgContext
|
||||
peers*: Table[PeerID, Peer]
|
||||
validTopics: HashSet[string]
|
||||
peerPingerHeartbeatFut: Future[void]
|
||||
cfg: RuntimeConfig
|
||||
getBeaconTime: GetBeaconTimeFn
|
||||
|
||||
|
@ -106,6 +107,8 @@ type
|
|||
lastReqTime*: Moment
|
||||
connections*: int
|
||||
enr*: Option[enr.Record]
|
||||
metadata*: Option[phase0.MetaData]
|
||||
lastMetadataTime*: Moment
|
||||
direction*: PeerType
|
||||
disconnectedFut: Future[void]
|
||||
|
||||
|
@ -591,7 +594,10 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
|
|||
deadline): return neterr StreamOpenTimeout
|
||||
try:
|
||||
# Send the request
|
||||
await stream.writeChunk(none ResponseCode, requestBytes)
|
||||
# Some clients don't want a length sent for empty requests
|
||||
# So don't send anything on empty requests
|
||||
if requestBytes.len > 0:
|
||||
await stream.writeChunk(none ResponseCode, requestBytes)
|
||||
# Half-close the stream to mark the end of the request - if this is not
|
||||
# done, the other peer might never send us the response.
|
||||
await stream.close()
|
||||
|
@ -901,59 +907,165 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] =
|
|||
|
||||
proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID,
|
||||
attnets: BitArray[ATTESTATION_SUBNET_COUNT]):
|
||||
Future[seq[PeerAddr]] {.async.} =
|
||||
## Perform a discovery query for a random target matching the eth2 field
|
||||
Future[seq[Node]] {.async.} =
|
||||
## Perform a discovery query for a random target
|
||||
## (forkId) and matching at least one of the attestation subnets.
|
||||
let nodes = await d.queryRandom()
|
||||
let sszForkId = SSZ.encode(forkId)
|
||||
|
||||
var filtered: seq[PeerAddr]
|
||||
let eth2Field = SSZ.encode(forkId)
|
||||
let nodes = await d.queryRandom((enrForkIdField, eth2Field))
|
||||
|
||||
var filtered: seq[(int, Node)]
|
||||
for n in nodes:
|
||||
if n.record.contains((enrForkIdField, sszForkId)):
|
||||
let res = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
|
||||
let res = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
|
||||
|
||||
if res.isSome():
|
||||
let attnetsNode =
|
||||
try:
|
||||
SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT])
|
||||
except SszError as e:
|
||||
debug "Could not decode attestation subnet bitfield of peer",
|
||||
peer = n.record.toURI(), exception = e.name, msg = e.msg
|
||||
continue
|
||||
if res.isSome():
|
||||
let attnetsNode =
|
||||
try:
|
||||
SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT])
|
||||
except SszError as e:
|
||||
debug "Could not decode attestation subnet bitfield of peer",
|
||||
peer = n.record.toURI(), exception = e.name, msg = e.msg
|
||||
continue
|
||||
|
||||
for i in 0..<attnetsNode.bytes.len:
|
||||
if (attnets.bytes[i] and attnetsNode.bytes[i]) > 0:
|
||||
# we have at least one subnet match
|
||||
let peerAddr = n.toPeerAddr()
|
||||
if peerAddr.isOk():
|
||||
filtered.add(peerAddr.get())
|
||||
break
|
||||
var score: int = 0
|
||||
for i in 0..<ATTESTATION_SUBNET_COUNT:
|
||||
if attnets[i] and attnetsNode[i]:
|
||||
inc score
|
||||
|
||||
if score > 0:
|
||||
filtered.add((score, n))
|
||||
|
||||
d.rng[].shuffle(filtered)
|
||||
return filtered.sortedByIt(it[0]).mapIt(it[1])
|
||||
|
||||
proc trimConnections(node: Eth2Node, count: int) {.async.} =
|
||||
# Kill `count` peers, scoring them to remove the least useful ones
|
||||
|
||||
var scores = initOrderedTable[PeerID, int]()
|
||||
|
||||
# Take into account the stabilitySubnets
|
||||
# During sync, only this will be used to score peers
|
||||
#
|
||||
# A peer subscribed to all stabilitySubnets will
|
||||
# have 640 points
|
||||
for peer in node.peers.values:
|
||||
if peer.connectionState != Connected: continue
|
||||
if peer.metadata.isNone: continue
|
||||
|
||||
let
|
||||
stabilitySubnets = peer.metadata.get().attnets
|
||||
stabilitySubnetsCount = stabilitySubnets.countOnes()
|
||||
thisPeersScore = 10 * stabilitySubnetsCount
|
||||
|
||||
scores[peer.info.peerId] = thisPeersScore
|
||||
|
||||
# Split a 1000 points for each topic's peers
|
||||
# This gives priority to peers in topics with few peers
|
||||
# For instance, a topic with `dHigh` peers will give 80 points to each peer
|
||||
# Whereas a topic with `dLow` peers will give 250 points to each peer
|
||||
for topic, _ in node.pubsub.topics:
|
||||
let
|
||||
peerCount = node.pubsub.mesh.peers(topic)
|
||||
scorePerPeer = 1_000 div max(peerCount, 1)
|
||||
|
||||
if peerCount == 0: continue
|
||||
|
||||
for peer in node.pubsub.mesh[topic]:
|
||||
if peer.peerId notin scores: continue
|
||||
|
||||
# Divide by the number of connections
|
||||
# A peer using multiple connections is wasteful
|
||||
let
|
||||
connCount = node.switch.connmanager.connCount(peer.peerId)
|
||||
thisPeersScore = scorePerPeer div max(1, connCount)
|
||||
|
||||
scores[peer.peerId] = scores[peer.peerId] + thisPeersScore
|
||||
|
||||
proc sortPerScore(a, b: (PeerID, int)): int =
|
||||
system.cmp(a[1], b[1])
|
||||
|
||||
scores.sort(sortPerScore)
|
||||
|
||||
var toKick = count
|
||||
|
||||
for peerId in scores.keys:
|
||||
#TODO kill a single connection instead of the whole peer
|
||||
# Not possible with the current libp2p's conn management
|
||||
debug "kicking peer", peerId
|
||||
await node.switch.disconnect(peerId)
|
||||
dec toKick
|
||||
if toKick <= 0: return
|
||||
|
||||
proc getLowAttnets(node: Eth2Node): BitArray[ATTESTATION_SUBNET_COUNT] =
|
||||
# Returns the subnets required to have a healthy mesh
|
||||
# The subnets are computed, to, in order:
|
||||
# - Have 0 subscribed subnet below `dLow`
|
||||
# - Have 0 subnet with < `d` peers from topic subscription
|
||||
# - Have 0 subscribed subnet below `dOut` outgoing peers
|
||||
|
||||
var
|
||||
lowOutgoingSubnets: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
belowDLowSubnets: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
belowDOutSubnets: BitArray[ATTESTATION_SUBNET_COUNT]
|
||||
|
||||
for subNetId in 0..<ATTESTATION_SUBNET_COUNT:
|
||||
let topic =
|
||||
getAttestationTopic(node.forkId.forkDigest, SubnetId(subNetId)) & "_snappy"
|
||||
|
||||
if node.pubsub.gossipsub.peers(topic) < node.pubsub.parameters.d:
|
||||
lowOutgoingSubnets.setBit(subNetId)
|
||||
|
||||
# Not subscribed
|
||||
if topic notin node.pubsub.mesh: continue
|
||||
|
||||
if node.pubsub.mesh.peers(topic) < node.pubsub.parameters.dLow:
|
||||
belowDlowSubnets.setBit(subNetId)
|
||||
|
||||
let outPeers = node.pubsub.mesh.getOrDefault(topic).toSeq().filterIt(it.outbound)
|
||||
if outPeers.len() < node.pubsub.parameters.dOut:
|
||||
belowDOutSubnets.setBit(subNetId)
|
||||
|
||||
if belowDLowSubnets.countOnes() > 0:
|
||||
return belowDLowSubnets
|
||||
|
||||
if lowOutgoingSubnets.countOnes() > 0:
|
||||
return lowOutgoingSubnets
|
||||
|
||||
return belowDOutSubnets
|
||||
|
||||
return filtered
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
|
||||
while true:
|
||||
if node.switch.connManager.outSema.count > 0:
|
||||
let forkId = (enrForkIdField, SSZ.encode(node.forkId))
|
||||
var discoveredNodes = await node.discovery.queryRandom(forkId)
|
||||
let
|
||||
wantedAttnets = node.getLowAttnets()
|
||||
wantedAttnetsCount = wantedAttnets.countOnes()
|
||||
|
||||
if wantedAttnetsCount > 0:
|
||||
let discoveredNodes = await node.discovery.queryRandom(node.forkId, wantedAttnets)
|
||||
|
||||
var newPeers = 0
|
||||
for discNode in discoveredNodes:
|
||||
let res = discNode.toPeerAddr()
|
||||
if res.isOk():
|
||||
let peerAddr = res.get()
|
||||
# Waiting for an empty space in PeerPool.
|
||||
while true:
|
||||
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0:
|
||||
await node.peerPool.waitForEmptySpace(PeerType.Outgoing)
|
||||
else:
|
||||
break
|
||||
# Check if peer present in SeenTable or PeerPool.
|
||||
if node.checkPeer(peerAddr):
|
||||
if peerAddr.peerId notin node.connTable:
|
||||
# We adding to pending connections table here, but going
|
||||
# to remove it only in `connectWorker`.
|
||||
|
||||
# If we are full, try to kick a peer (3 max)
|
||||
for _ in 0..<3:
|
||||
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0 and newPeers == 0:
|
||||
await node.trimConnections(1)
|
||||
else: break
|
||||
|
||||
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0:
|
||||
# No room anymore
|
||||
break
|
||||
|
||||
node.connTable.incl(peerAddr.peerId)
|
||||
await node.connQueue.addLast(peerAddr)
|
||||
inc(newPeers)
|
||||
|
@ -979,7 +1091,9 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
|||
|
||||
# Discovery `queryRandom` can have a synchronous fast path for example
|
||||
# when no peers are in the routing table. Don't run it in continuous loop.
|
||||
await sleepAsync(1.seconds)
|
||||
#
|
||||
# Also, give some time to dial the discovered nodes and update stats etc
|
||||
await sleepAsync(5.seconds)
|
||||
|
||||
proc getPersistentNetMetadata*(config: BeaconNodeConf): altair.MetaData
|
||||
{.raises: [Defect, IOError, SerializationError].} =
|
||||
|
@ -1170,7 +1284,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, runtimeCfg: RuntimeConfig,
|
|||
discoveryEnabled: discovery,
|
||||
rng: rng,
|
||||
connectTimeout: connectTimeout,
|
||||
seenThreshold: seenThreshold,
|
||||
seenThreshold: seenThreshold
|
||||
)
|
||||
|
||||
newSeq node.protocolStates, allProtocols.len
|
||||
|
@ -1212,6 +1326,8 @@ proc startListening*(node: Eth2Node) {.async.} =
|
|||
|
||||
await node.pubsub.start()
|
||||
|
||||
proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.gcsafe.}
|
||||
|
||||
proc start*(node: Eth2Node) {.async.} =
|
||||
|
||||
proc onPeerCountChanged() =
|
||||
|
@ -1240,6 +1356,7 @@ proc start*(node: Eth2Node) {.async.} =
|
|||
let pa = tr.get().toPeerAddr(tcpProtocol)
|
||||
if pa.isOk():
|
||||
await node.connQueue.addLast(pa.get())
|
||||
node.peerPingerHeartbeatFut = node.peerPingerHeartbeat()
|
||||
|
||||
proc stop*(node: Eth2Node) {.async.} =
|
||||
# Ignore errors in futures, since we're shutting down (but log them on the
|
||||
|
@ -1261,6 +1378,7 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
|
|||
network: network,
|
||||
connectionState: ConnectionState.None,
|
||||
lastReqTime: now(chronos.Moment),
|
||||
lastMetadataTime: now(chronos.Moment),
|
||||
protocolStates: newSeq[RootRef](len(allProtocols))
|
||||
)
|
||||
for i in 0 ..< len(allProtocols):
|
||||
|
@ -1394,6 +1512,51 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
|
|||
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
|
||||
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
|
||||
|
||||
#Must import here because of cyclicity
|
||||
import ../sync/sync_protocol
|
||||
|
||||
proc updatePeerMetadata(node: Eth2Node, peerId: PeerID) {.async.} =
|
||||
trace "updating peer metadata", peerId
|
||||
|
||||
var peer = node.getPeer(peerId)
|
||||
let response = await peer.getMetaData()
|
||||
|
||||
if response.isErr:
|
||||
debug "Failed to retrieve metadata from peer!", peerId
|
||||
return
|
||||
|
||||
let newMetadata = response.get()
|
||||
peer.metadata = some(newMetadata)
|
||||
peer.lastMetadataTime = Moment.now()
|
||||
|
||||
proc peerPingerHeartbeat(node: Eth2Node) {.async.} =
|
||||
while true:
|
||||
let heartbeatStart_m = Moment.now()
|
||||
var updateFutures: seq[Future[void]]
|
||||
|
||||
for peer in node.peers.values:
|
||||
if peer.connectionState != Connected: continue
|
||||
|
||||
if peer.metadata.isNone or
|
||||
heartbeatStart_m - peer.lastMetadataTime > 30.minutes:
|
||||
updateFutures.add(node.updatePeerMetadata(peer.info.peerId))
|
||||
|
||||
discard await allFinished(updateFutures)
|
||||
|
||||
for peer in node.peers.values:
|
||||
if peer.connectionState != Connected: continue
|
||||
let lastMetadata =
|
||||
if peer.metadata.isNone:
|
||||
peer.lastMetadataTime
|
||||
else:
|
||||
peer.lastMetadataTime + 30.minutes
|
||||
|
||||
if heartbeatStart_m - lastMetadata > 30.seconds:
|
||||
debug "no metadata for 30 seconds, kicking peer", peer
|
||||
asyncSpawn peer.disconnect(PeerScoreLow)
|
||||
|
||||
await sleepAsync(8.seconds)
|
||||
|
||||
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
|
||||
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
|
||||
|
||||
|
|
Loading…
Reference in New Issue