add: logic constructing valid set of peers

This commit is contained in:
Agnish Ghosh 2024-07-19 13:58:26 +05:30
parent e034f30b5e
commit fa5b15436c
No known key found for this signature in database
GPG Key ID: 7BDDA05D1B25E9F8
3 changed files with 101 additions and 30 deletions

View File

@ -27,7 +27,8 @@ import
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[version, conf, beacon_clock, conf_light_client], ".."/[version, conf, beacon_clock, conf_light_client],
../spec/datatypes/[phase0, altair, bellatrix, eip7594], ../spec/datatypes/[phase0, altair, bellatrix, eip7594],
../spec/[eth2_ssz_serialization, network, helpers, forks], ../spec/[eth2_ssz_serialization, network,
helpers, eip7594_helpers, forks],
../validators/keystore_management, ../validators/keystore_management,
"."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores]
@ -1482,7 +1483,8 @@ proc trimConnections(node: Eth2Node, count: int) =
inc(nbc_cycling_kicked_peers) inc(nbc_cycling_kicked_peers)
if toKick <= 0: return if toKick <= 0: return
proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits, CscBits) = proc getLowSubnets(node: Eth2Node, epoch: Epoch):
(AttnetBits, SyncnetBits, CscBits) =
# Returns the subnets required to have a healthy mesh # Returns the subnets required to have a healthy mesh
# The subnets are computed, to, in order: # The subnets are computed, to, in order:
# - Have 0 subnet with < `dLow` peers from topic subscription # - Have 0 subnet with < `dLow` peers from topic subscription
@ -1575,7 +1577,11 @@ proc runDiscoveryLoop(node: Eth2Node) {.async.} =
else: else:
0 0
discoveredNodes = await node.discovery.queryRandom( discoveredNodes = await node.discovery.queryRandom(
node.discoveryForkId, wantedAttnets, wantedSyncnets, wantedCscnets, minScore) node.discoveryForkId,
wantedAttnets,
wantedSyncnets,
wantedCscnets,
minScore)
let newPeers = block: let newPeers = block:
var np = newSeq[PeerAddr]() var np = newSeq[PeerAddr]()
@ -1624,6 +1630,97 @@ proc runDiscoveryLoop(node: Eth2Node) {.async.} =
# Also, give some time to dial the discovered nodes and update stats etc # Also, give some time to dial the discovered nodes and update stats etc
await sleepAsync(5.seconds) await sleepAsync(5.seconds)
proc getNodeIdFromPeer*(peer: Peer): NodeId=
# Convert peer id to node id by extracting the peer's public key
let nodeId =
block:
var key: PublicKey
discard peer.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
nodeId
proc fetchCustodyColumnCountFromRemotePeer*(node: Eth2Node,
pid: PeerId):
uint64 =
# Fetches the custody column count from a remote peer
# if the peer advertises their custody column count
# via the `csc` ENR field. If the peer does NOT, then
# the default value is assume, i.e, CUSTODY_REQUIREMENT
let
eth2node = node
peer = eth2node.getPeer(pid)
let enrOpt = peer.enr
if enrOpt.isNone:
debug "Could not get ENR from peer",
peer_id = pid
return 0
else:
let
enr = enrOpt.get
enrFieldOpt =
enr.get(enrCustodySubnetCountField, uint64)
if not enrFieldOpt.isOk:
debug "Issue with fetching `csc` field from ENR",
enr = enr
else:
return(enrFieldOpt.get)
proc constructValidCustodyPeers*(node: Eth2Node,
peers: openArray[Peer],
config: BeaconNodeConf):
seq[PeerId] =
# `constructValidaCustodyPeers` returns a list of peers that are an overall
# superset of the given local node's custody columns and the custody columns
# of the remote peers
# Get the local custody subnet count
var localCustodySubnetCount: uint64
if config.subscribeAllSubnets:
localCustodySubnetCount = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
localCustodySubnetCount = CUSTODY_REQUIREMENT
# Get the local custody columns
let
localNodeId = node.nodeId
localCustodyColumns =
localNodeId.get_custody_columns(localCustodySubnetCount).get
var validPeerIds = newSeqOfCap[PeerId](peers.len)
for peer in peers:
# Get the custody subnet counts of the remote peer
let remoteCustodySubnetCount =
node.fetchCustodyColumnCountFromRemotePeer(peer.peerId)
# Extract remote peer's nodeID from peerID
# Fetch custody columns fromm remote peer
let
remoteNodeId = getNodeIdFromPeer(peer)
remoteCustodyColumns =
remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get
# If the remote peer custodies less columns than our local node
# We skip it
if remoteCustodyColumns.len < localCustodyColumns.len:
continue
# If the remote peer custodies all the possible columns
if remoteCustodyColumns.len == NUMBER_OF_COLUMNS:
validPeerIds.add(peer.peerId)
# Filtering out the invalid peers
for column in localCustodyColumns:
if column notin remoteCustodyColumns:
continue
# Otherwise add the peer to the set
# of valid peerIds
validPeerIds.add(peer.peerId)
validPeerIds
proc resolvePeer(peer: Peer) = proc resolvePeer(peer: Peer) =
# Resolve task which performs searching of peer's public key and recovery of # Resolve task which performs searching of peer's public key and recovery of
# ENR using discovery5. We only resolve ENR for peers we know about to avoid # ENR using discovery5. We only resolve ENR for peers we know about to avoid

View File

@ -1136,32 +1136,6 @@ proc fetchCustodySubnetCount* (node: BeaconNode): uint64=
res = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 res = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
res res
proc fetchCustodyColumnCountFromRemotePeer*(node: BeaconNode, pid: PeerId): uint64 =
# Fetches the custody column count from a remote peer
# if the peer advertises their custody column count
# via the `csc` ENR field. If the peer does NOT, then
# the default value is assume, i.e, CUSTODY_REQUIREMENT
let
eth2node = node.network
peer = eth2node.getPeer(pid)
let enrOpt = peer.enr
if enrOpt.isNone:
debug "Could not get ENR from peer",
peer_id = pid
return 0
else:
let
enr = enrOpt.get
enrFieldOpt = enr.get(enrCustodySubnetCountField, uint64)
if not enrFieldOpt.isOk:
debug "Issue with fetching `csc` field from ENR",
enr = enr
else:
return(enrFieldOpt.get)
proc addDenebMessageHandlers( proc addDenebMessageHandlers(
node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node: BeaconNode, forkDigest: ForkDigest, slot: Slot) =
node.addCapellaMessageHandlers(forkDigest, slot) node.addCapellaMessageHandlers(forkDigest, slot)

View File

@ -548,7 +548,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
withBlck(blck[]): withBlck(blck[]):
when consensusFork >= ConsensusFork.Deneb: when consensusFork >= ConsensusFork.Deneb:
if forkyBlck.message.body.blob_kzg_commitments.len > 0: if forkyBlck.message.body.blob_kzg_commitments.len > 0:
hasColumns = true hasColumns = true
break break
hasColumns hasColumns