From cd5532c07836e64893aa5ed2d9f536ff20127a07 Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Sat, 17 Aug 2024 20:51:36 +0530 Subject: [PATCH] add filtering to peer pool and protocol, remove filtering from SyncManager level --- beacon_chain/networking/eth2_network.nim | 30 ++++- beacon_chain/networking/peer_pool.nim | 14 +- beacon_chain/networking/peer_protocol.nim | 11 ++ beacon_chain/sync/request_manager.nim | 27 +++- beacon_chain/sync/sync_manager.nim | 149 ++++++---------------- 5 files changed, 114 insertions(+), 117 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index f017bf52b..363b22a7d 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -1668,7 +1668,7 @@ proc resolvePeer(peer: Peer) = proc handlePeer*(peer: Peer) {.async: (raises: [CancelledError]).} = let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) case res: - of PeerStatus.LowScoreError, PeerStatus.NoSpaceError: + of PeerStatus.LowScoreError, PeerStatus.LowCscError, PeerStatus.NoSpaceError: # Peer has low score or we do not have enough space in PeerPool, # we are going to disconnect it gracefully. # Peer' state will be updated in connection event. @@ -1821,7 +1821,8 @@ proc new(T: type Eth2Node, config, ip, tcpPort, udpPort, privKey, { enrForkIdField: SSZ.encode(enrForkId), - enrAttestationSubnetsField: SSZ.encode(metadata.attnets) + enrAttestationSubnetsField: SSZ.encode(metadata.attnets), + enrCustodySubnetCountField: SSZ.encode(metadata.custody_subnet_count) }, rng), discoveryEnabled: discovery, @@ -1841,6 +1842,31 @@ proc new(T: type Eth2Node, proc scoreCheck(peer: Peer): bool = peer.score >= PeerScoreLowLimit + proc fetchCustodyColumnCountFromRemotePeer(peer: Peer): + uint64 = + # Fetches the custody column count from a remote peer + # if the peer advertises their custody column count + # via the `csc` ENR field. If the peer does NOT, then + # the default value is assume, i.e, CUSTODY_REQUIREMENT + + let enrOpt = peer.enr + if enrOpt.isNone: + debug "Could not get ENR from peer", + peer_id = peer.peerId + return 0 + + else: + let + enr = enrOpt.get + enrFieldOpt = + enr.get(enrCustodySubnetCountField, uint64) + + if not enrFieldOpt.isOk: + debug "Issue with fetching `csc` field from ENR", + enr = enr + else: + return(enrFieldOpt.get) + proc onDeletePeer(peer: Peer) = peer.releasePeer() diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 611c86e30..6d70c97b3 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -9,6 +9,8 @@ import std/[tables, heapqueue] import chronos +import + ../spec/datatypes/[eip7594] export tables @@ -27,7 +29,8 @@ type DuplicateError, ## Peer is already present in PeerPool. NoSpaceError, ## There no space for the peer in PeerPool. LowScoreError, ## Peer has too low score. - DeadPeerError ## Peer is already dead. + DeadPeerError, ## Peer is already dead. + LowCscError, ## Peer's custody subnet count is lower than minimum requirment. PeerItem[T] = object data: T @@ -283,6 +286,14 @@ proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = else: true +proc checkCscCount*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = + ## Returns ``true`` if peer had MINIMUM_CUSTODY_REQUIREMENT + let csc = pool.fetchCustodyColumnCountFromRemotePeer(peer) + if csc >= CUSTODY_REQUIREMENT: + true + else: + false + proc peerCountChanged[A, B](pool: PeerPool[A, B]) = ## Call callback when number of peers changed. if not(isNil(pool.peerCounter)): @@ -380,6 +391,7 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = ## * Positive value of peer's score - (PeerStatus.LowScoreError) ## * Peer's key is not present in PeerPool - (PeerStatus.DuplicateError) ## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError) + ## * Low value of peer's custody subnet count - (PeerStatus.LowCscError) ## ## If peer could be added to PeerPool procedure returns (PeerStatus.Success) mixin getKey, getFuture diff --git a/beacon_chain/networking/peer_protocol.nim b/beacon_chain/networking/peer_protocol.nim index 59efc8709..de04a749e 100644 --- a/beacon_chain/networking/peer_protocol.nim +++ b/beacon_chain/networking/peer_protocol.nim @@ -10,6 +10,7 @@ import chronicles, ../spec/network, + ../spec/datatypes/[eip7594], ".."/[beacon_clock], ../networking/eth2_network, ../consensus_object_pools/blockchain_dag, @@ -149,6 +150,16 @@ p2pProtocol PeerSync(version = 1, let ourStatus = peer.networkState.getCurrentStatus() theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR) + + if incoming == true: + let + metadataRes = await peer.getMetadata_v3() + metadata = metadataRes.get + if metadata.custody_subnet_count < CUSTODY_REQUIREMENT: + debug "Custody requirement is lesser than min, peer disconnected", + peer, errorKind = theirStatus.error.kind + await peer.disconnect(FaultOrError) + else: discard if theirStatus.isOk: discard await peer.handleStatus(peer.networkState, theirStatus.get()) diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 2940670ec..e305ac71e 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -268,6 +268,31 @@ proc fetchBlobsFromNetwork(self: RequestManager, if not(isNil(peer)): self.network.peerPool.release(peer) +proc lookupCscFromPeer(peer: Peer): + uint64 = + # Fetches the custody column count from a remote peer + # if the peer advertises their custody column count + # via the `csc` ENR field. If the peer does NOT, then + # the default value is assume, i.e, CUSTODY_REQUIREMENT + + let enrOpt = peer.enr + if enrOpt.isNone: + debug "Could not get ENR from peer", + peer_id = peer.peerId + return 0 + + else: + let + enr = enrOpt.get + enrFieldOpt = + enr.get(enrCustodySubnetCountField, uint64) + + if not enrFieldOpt.isOk: + debug "Issue with fetching `csc` field from ENR", + enr = enr + else: + return(enrFieldOpt.get) + proc constructValidCustodyPeers(rman: RequestManager, peers: openArray[Peer]): seq[Peer] = @@ -288,7 +313,7 @@ proc constructValidCustodyPeers(rman: RequestManager, for peer in peers: # Get the custody subnet count of the remote peer let remoteCustodySubnetCount = - peer.fetchCustodyColumnCountFromRemotePeer() + peer.lookupCscFromPeer() # Extract remote peer's nodeID from peerID # Fetch custody columns from remote peer diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 325414518..dc2febfc3 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -326,81 +326,6 @@ func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, strin ? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof() ok() -proc fetchCustodyColumnCountFromRemotePeer*(peer: Peer): - uint64 = - # Fetches the custody column count from a remote peer - # if the peer advertises their custody column count - # via the `csc` ENR field. If the peer does NOT, then - # the default value is assume, i.e, CUSTODY_REQUIREMENT - - let enrOpt = peer.enr - if enrOpt.isNone: - debug "Could not get ENR from peer", - peer_id = peer.peerId - return 0 - - else: - let - enr = enrOpt.get - enrFieldOpt = - enr.get(enrCustodySubnetCountField, uint64) - - if not enrFieldOpt.isOk: - debug "Issue with fetching `csc` field from ENR", - enr = enr - else: - return(enrFieldOpt.get) - -proc checkValidPeerCustody*[A, B](man: SyncManager[A, B], peer: A): bool = - # `constructValidaCustodyPeers` returns a list of peers that are an overall - # superset of the given local node's custody columns and the custody columns - # of the remote peers - - # Get the local custody subnet count - - let localCustodySubnetCount = - if man.supernode: - DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 - else: - CUSTODY_REQUIREMENT - - # Get the local custody columns - let - localNodeId = peer.getNodeIdFromPeer() - localCustodyColumns = - localNodeId.get_custody_columns(localCustodySubnetCount).get - - var validPeerIds: seq[PeerId] - - # Get the custody subnet counts of the remote peer - let remoteCustodySubnetCount = - peer.fetchCustodyColumnCountFromRemotePeer() - - # Extract remote peer's nodeID from peerID - # Fetch custody columns fromm remote peer - let - remoteNodeId = peer.getNodeIdFromPeer() - remoteCustodyColumns = - remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get - - # If the remote peer custodies less columns than our local node - # We skip it - if remoteCustodyColumns.len < localCustodyColumns.len: - return false - - # If the remote peer custodies all the possible columns - if remoteCustodyColumns.len == NUMBER_OF_COLUMNS: - return true - - # Filtering out the invalid peers - for column in localCustodyColumns: - if column notin remoteCustodyColumns: - return true - - # Otherwise add the peer to the set - # of valid peerIds - true - proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async: (raises: [CancelledError]).} = logScope: @@ -632,48 +557,46 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) let dataColumnData = if shouldGetDataColumns: - if man.checkValidPeerCustody(peer): - let data_columns = await man.getDataColumnSidecars(peer, req) - if data_columns.isErr(): - # peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive data columns on request", - request = req, err = data_columns.error - return - let dataColumnData = data_columns.get().asSeq() - let dataColumnSmap = getShortMap(req, dataColumnData) - debug "Received data columns on request", data_columns_count = len(dataColumnData), - data_columns_map = dataColumnSmap, request = req + let data_columns = await man.getDataColumnSidecars(peer, req) + if data_columns.isErr(): + # peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive data columns on request", + request = req, err = data_columns.error + return + let dataColumnData = data_columns.get().asSeq() + let dataColumnSmap = getShortMap(req, dataColumnData) + debug "Received data columns on request", data_columns_count = len(dataColumnData), + data_columns_map = dataColumnSmap, request = req - if len(dataColumnData) > 0: - let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot) - let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) - if not(checkResponse(req, uniqueSlots)): - # peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received data columns sequence is not in requested range", - data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData), - request = req - return - let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData) - if groupedDataColumns.isErr(): - # peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - # warn "Received data columns is inconsistent", - # data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error() - return - if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr): + if len(dataColumnData) > 0: + let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot) + let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + if not(checkResponse(req, uniqueSlots)): # peer.updateScore(PeerScoreBadResponse) man.queue.push(req) - warn "Received data columns is invalid", - data_columns_count = len(dataColumnData), - data_columns_map = getShortMap(req, dataColumnData), - request = req, - msg = checkRes.error + warn "Received data columns sequence is not in requested range", + data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData), + request = req return - Opt.some(groupedDataColumns.get()) - else: - Opt.none(seq[DataColumnSidecars]) + let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData) + if groupedDataColumns.isErr(): + # peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + # warn "Received data columns is inconsistent", + # data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error() + return + if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr): + # peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received data columns is invalid", + data_columns_count = len(dataColumnData), + data_columns_map = getShortMap(req, dataColumnData), + request = req, + msg = checkRes.error + return + Opt.some(groupedDataColumns.get()) + else: Opt.none(seq[DataColumnSidecars])