add filtering to peer pool and protocol, remove filtering from SyncManager level

This commit is contained in:
Agnish Ghosh 2024-08-17 20:51:36 +05:30
parent f3f61cbbff
commit cd5532c078
5 changed files with 114 additions and 117 deletions

View File

@ -1668,7 +1668,7 @@ proc resolvePeer(peer: Peer) =
proc handlePeer*(peer: Peer) {.async: (raises: [CancelledError]).} =
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
case res:
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
of PeerStatus.LowScoreError, PeerStatus.LowCscError, PeerStatus.NoSpaceError:
# Peer has low score or we do not have enough space in PeerPool,
# we are going to disconnect it gracefully.
# Peer' state will be updated in connection event.
@ -1821,7 +1821,8 @@ proc new(T: type Eth2Node,
config, ip, tcpPort, udpPort, privKey,
{
enrForkIdField: SSZ.encode(enrForkId),
enrAttestationSubnetsField: SSZ.encode(metadata.attnets)
enrAttestationSubnetsField: SSZ.encode(metadata.attnets),
enrCustodySubnetCountField: SSZ.encode(metadata.custody_subnet_count)
},
rng),
discoveryEnabled: discovery,
@ -1841,6 +1842,31 @@ proc new(T: type Eth2Node,
proc scoreCheck(peer: Peer): bool =
peer.score >= PeerScoreLowLimit
proc fetchCustodyColumnCountFromRemotePeer(peer: Peer):
uint64 =
# Fetches the custody column count from a remote peer
# if the peer advertises their custody column count
# via the `csc` ENR field. If the peer does NOT, then
# the default value is assume, i.e, CUSTODY_REQUIREMENT
let enrOpt = peer.enr
if enrOpt.isNone:
debug "Could not get ENR from peer",
peer_id = peer.peerId
return 0
else:
let
enr = enrOpt.get
enrFieldOpt =
enr.get(enrCustodySubnetCountField, uint64)
if not enrFieldOpt.isOk:
debug "Issue with fetching `csc` field from ENR",
enr = enr
else:
return(enrFieldOpt.get)
proc onDeletePeer(peer: Peer) =
peer.releasePeer()

View File

@ -9,6 +9,8 @@
import std/[tables, heapqueue]
import chronos
import
../spec/datatypes/[eip7594]
export tables
@ -27,7 +29,8 @@ type
DuplicateError, ## Peer is already present in PeerPool.
NoSpaceError, ## There no space for the peer in PeerPool.
LowScoreError, ## Peer has too low score.
DeadPeerError ## Peer is already dead.
DeadPeerError, ## Peer is already dead.
LowCscError, ## Peer's custody subnet count is lower than minimum requirment.
PeerItem[T] = object
data: T
@ -283,6 +286,14 @@ proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
else:
true
proc checkCscCount*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
## Returns ``true`` if peer had MINIMUM_CUSTODY_REQUIREMENT
let csc = pool.fetchCustodyColumnCountFromRemotePeer(peer)
if csc >= CUSTODY_REQUIREMENT:
true
else:
false
proc peerCountChanged[A, B](pool: PeerPool[A, B]) =
## Call callback when number of peers changed.
if not(isNil(pool.peerCounter)):
@ -380,6 +391,7 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} =
## * Positive value of peer's score - (PeerStatus.LowScoreError)
## * Peer's key is not present in PeerPool - (PeerStatus.DuplicateError)
## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError)
## * Low value of peer's custody subnet count - (PeerStatus.LowCscError)
##
## If peer could be added to PeerPool procedure returns (PeerStatus.Success)
mixin getKey, getFuture

View File

@ -10,6 +10,7 @@
import
chronicles,
../spec/network,
../spec/datatypes/[eip7594],
".."/[beacon_clock],
../networking/eth2_network,
../consensus_object_pools/blockchain_dag,
@ -149,6 +150,16 @@ p2pProtocol PeerSync(version = 1,
let
ourStatus = peer.networkState.getCurrentStatus()
theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
if incoming == true:
let
metadataRes = await peer.getMetadata_v3()
metadata = metadataRes.get
if metadata.custody_subnet_count < CUSTODY_REQUIREMENT:
debug "Custody requirement is lesser than min, peer disconnected",
peer, errorKind = theirStatus.error.kind
await peer.disconnect(FaultOrError)
else: discard
if theirStatus.isOk:
discard await peer.handleStatus(peer.networkState, theirStatus.get())

View File

@ -268,6 +268,31 @@ proc fetchBlobsFromNetwork(self: RequestManager,
if not(isNil(peer)):
self.network.peerPool.release(peer)
proc lookupCscFromPeer(peer: Peer):
uint64 =
# Fetches the custody column count from a remote peer
# if the peer advertises their custody column count
# via the `csc` ENR field. If the peer does NOT, then
# the default value is assume, i.e, CUSTODY_REQUIREMENT
let enrOpt = peer.enr
if enrOpt.isNone:
debug "Could not get ENR from peer",
peer_id = peer.peerId
return 0
else:
let
enr = enrOpt.get
enrFieldOpt =
enr.get(enrCustodySubnetCountField, uint64)
if not enrFieldOpt.isOk:
debug "Issue with fetching `csc` field from ENR",
enr = enr
else:
return(enrFieldOpt.get)
proc constructValidCustodyPeers(rman: RequestManager,
peers: openArray[Peer]):
seq[Peer] =
@ -288,7 +313,7 @@ proc constructValidCustodyPeers(rman: RequestManager,
for peer in peers:
# Get the custody subnet count of the remote peer
let remoteCustodySubnetCount =
peer.fetchCustodyColumnCountFromRemotePeer()
peer.lookupCscFromPeer()
# Extract remote peer's nodeID from peerID
# Fetch custody columns from remote peer

View File

@ -326,81 +326,6 @@ func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, strin
? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof()
ok()
proc fetchCustodyColumnCountFromRemotePeer*(peer: Peer):
uint64 =
# Fetches the custody column count from a remote peer
# if the peer advertises their custody column count
# via the `csc` ENR field. If the peer does NOT, then
# the default value is assume, i.e, CUSTODY_REQUIREMENT
let enrOpt = peer.enr
if enrOpt.isNone:
debug "Could not get ENR from peer",
peer_id = peer.peerId
return 0
else:
let
enr = enrOpt.get
enrFieldOpt =
enr.get(enrCustodySubnetCountField, uint64)
if not enrFieldOpt.isOk:
debug "Issue with fetching `csc` field from ENR",
enr = enr
else:
return(enrFieldOpt.get)
proc checkValidPeerCustody*[A, B](man: SyncManager[A, B], peer: A): bool =
# `constructValidaCustodyPeers` returns a list of peers that are an overall
# superset of the given local node's custody columns and the custody columns
# of the remote peers
# Get the local custody subnet count
let localCustodySubnetCount =
if man.supernode:
DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
else:
CUSTODY_REQUIREMENT
# Get the local custody columns
let
localNodeId = peer.getNodeIdFromPeer()
localCustodyColumns =
localNodeId.get_custody_columns(localCustodySubnetCount).get
var validPeerIds: seq[PeerId]
# Get the custody subnet counts of the remote peer
let remoteCustodySubnetCount =
peer.fetchCustodyColumnCountFromRemotePeer()
# Extract remote peer's nodeID from peerID
# Fetch custody columns fromm remote peer
let
remoteNodeId = peer.getNodeIdFromPeer()
remoteCustodyColumns =
remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get
# If the remote peer custodies less columns than our local node
# We skip it
if remoteCustodyColumns.len < localCustodyColumns.len:
return false
# If the remote peer custodies all the possible columns
if remoteCustodyColumns.len == NUMBER_OF_COLUMNS:
return true
# Filtering out the invalid peers
for column in localCustodyColumns:
if column notin remoteCustodyColumns:
return true
# Otherwise add the peer to the set
# of valid peerIds
true
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
{.async: (raises: [CancelledError]).} =
logScope:
@ -632,48 +557,46 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
let dataColumnData =
if shouldGetDataColumns:
if man.checkValidPeerCustody(peer):
let data_columns = await man.getDataColumnSidecars(peer, req)
if data_columns.isErr():
# peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive data columns on request",
request = req, err = data_columns.error
return
let dataColumnData = data_columns.get().asSeq()
let dataColumnSmap = getShortMap(req, dataColumnData)
debug "Received data columns on request", data_columns_count = len(dataColumnData),
data_columns_map = dataColumnSmap, request = req
let data_columns = await man.getDataColumnSidecars(peer, req)
if data_columns.isErr():
# peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive data columns on request",
request = req, err = data_columns.error
return
let dataColumnData = data_columns.get().asSeq()
let dataColumnSmap = getShortMap(req, dataColumnData)
debug "Received data columns on request", data_columns_count = len(dataColumnData),
data_columns_map = dataColumnSmap, request = req
if len(dataColumnData) > 0:
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
# peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received data columns sequence is not in requested range",
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
request = req
return
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
if groupedDataColumns.isErr():
# peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
# warn "Received data columns is inconsistent",
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
return
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
if len(dataColumnData) > 0:
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
# peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received data columns is invalid",
data_columns_count = len(dataColumnData),
data_columns_map = getShortMap(req, dataColumnData),
request = req,
msg = checkRes.error
warn "Received data columns sequence is not in requested range",
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
request = req
return
Opt.some(groupedDataColumns.get())
else:
Opt.none(seq[DataColumnSidecars])
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
if groupedDataColumns.isErr():
# peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
# warn "Received data columns is inconsistent",
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
return
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
# peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received data columns is invalid",
data_columns_count = len(dataColumnData),
data_columns_map = getShortMap(req, dataColumnData),
request = req,
msg = checkRes.error
return
Opt.some(groupedDataColumns.get())
else:
Opt.none(seq[DataColumnSidecars])