diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index d3338ddba..4e3d6e593 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -28,7 +28,7 @@ import ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix, eip7594], ../spec/[eth2_ssz_serialization, network, - helpers, eip7594_helpers, forks], + helpers, forks], ../validators/keystore_management, "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] @@ -1634,93 +1634,11 @@ proc getNodeIdFromPeer*(peer: Peer): NodeId= # Convert peer id to node id by extracting the peer's public key let nodeId = block: - var key: PublicKey + var key: eth2_network.PublicKey discard peer.peerId.extractPublicKey(key) keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() nodeId -proc fetchCustodyColumnCountFromRemotePeer*(node: Eth2Node, - pid: PeerId): - uint64 = - # Fetches the custody column count from a remote peer - # if the peer advertises their custody column count - # via the `csc` ENR field. If the peer does NOT, then - # the default value is assume, i.e, CUSTODY_REQUIREMENT - let - eth2node = node - peer = eth2node.getPeer(pid) - - let enrOpt = peer.enr - if enrOpt.isNone: - debug "Could not get ENR from peer", - peer_id = pid - return 0 - - else: - let - enr = enrOpt.get - enrFieldOpt = - enr.get(enrCustodySubnetCountField, uint64) - - if not enrFieldOpt.isOk: - debug "Issue with fetching `csc` field from ENR", - enr = enr - else: - return(enrFieldOpt.get) - -proc constructValidCustodyPeers*(node: Eth2Node, - peers: openArray[Peer], - config: BeaconNodeConf): - seq[PeerId] = - # `constructValidaCustodyPeers` returns a list of peers that are an overall - # superset of the given local node's custody columns and the custody columns - # of the remote peers - - # Get the local custody subnet count - var localCustodySubnetCount: uint64 - if config.subscribeAllSubnets: - localCustodySubnetCount = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 - localCustodySubnetCount = CUSTODY_REQUIREMENT - - # Get the local custody columns - let - localNodeId = node.nodeId - localCustodyColumns = - localNodeId.get_custody_columns(localCustodySubnetCount).get - - var validPeerIds = newSeqOfCap[PeerId](peers.len) - - for peer in peers: - # Get the custody subnet counts of the remote peer - let remoteCustodySubnetCount = - node.fetchCustodyColumnCountFromRemotePeer(peer.peerId) - - # Extract remote peer's nodeID from peerID - # Fetch custody columns fromm remote peer - let - remoteNodeId = getNodeIdFromPeer(peer) - remoteCustodyColumns = - remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get - - # If the remote peer custodies less columns than our local node - # We skip it - if remoteCustodyColumns.len < localCustodyColumns.len: - continue - - # If the remote peer custodies all the possible columns - if remoteCustodyColumns.len == NUMBER_OF_COLUMNS: - validPeerIds.add(peer.peerId) - - # Filtering out the invalid peers - for column in localCustodyColumns: - if column notin remoteCustodyColumns: - continue - - # Otherwise add the peer to the set - # of valid peerIds - validPeerIds.add(peer.peerId) - validPeerIds - proc resolvePeer(peer: Peer) = # Resolve task which performs searching of peer's public key and recovery of # ENR using discovery5. We only resolve ENR for peers we know about to avoid diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index f5144ec68..5d098f817 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -462,22 +462,25 @@ proc initFullNode( validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine, rng, getBeaconTime, taskpool) + router = (ref MessageRouter)( + processor: processor, + network: node.network) + + var supernode = node.config.subscribeAllSubnets + let syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, - SyncQueueKind.Forward, getLocalHeadSlot, + supernode, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.tail.slot, blockVerifier) backfiller = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, - SyncQueueKind.Backward, getLocalHeadSlot, + supernode, SyncQueueKind.Backward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, - maxHeadAge = 0) - router = (ref MessageRouter)( - processor: processor, - network: node.network) + maxHeadAge = 0) requestManager = RequestManager.init( node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime, (proc(): bool = syncManager.inProgress), diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index bef03faf9..2d50f2181 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -52,7 +52,9 @@ type MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: uint64 MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS: uint64 responseTimeout: chronos.Duration + supernode: bool maxHeadAge: uint64 + network: Eth2Node getLocalHeadSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback getSafeSlot: GetSlotCallback @@ -121,6 +123,7 @@ proc initQueue[A, B](man: SyncManager[A, B]) = proc newSyncManager*[A, B](pool: PeerPool[A, B], denebEpoch: Epoch, minEpochsForBlobSidecarsRequests: uint64, + supernode: bool, direction: SyncQueueKind, getLocalHeadSlotCb: GetSlotCallback, getLocalWallSlotCb: GetSlotCallback, @@ -144,6 +147,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], pool: pool, DENEB_FORK_EPOCH: denebEpoch, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: minEpochsForBlobSidecarsRequests, + supernode: supernode, getLocalHeadSlot: getLocalHeadSlotCb, getLocalWallSlot: getLocalWallSlotCb, getSafeSlot: getSafeSlot, @@ -323,6 +327,83 @@ func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, strin ? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof() ok() +proc fetchCustodyColumnCountFromRemotePeer*(peer: Peer): + uint64 = + # Fetches the custody column count from a remote peer + # if the peer advertises their custody column count + # via the `csc` ENR field. If the peer does NOT, then + # the default value is assume, i.e, CUSTODY_REQUIREMENT + + let enrOpt = peer.enr + if enrOpt.isNone: + debug "Could not get ENR from peer", + peer_id = peer.peerId + return 0 + + else: + let + enr = enrOpt.get + enrFieldOpt = + enr.get(enrCustodySubnetCountField, uint64) + + if not enrFieldOpt.isOk: + debug "Issue with fetching `csc` field from ENR", + enr = enr + else: + return(enrFieldOpt.get) + +proc checkValidPeerCustody*[A, B](man: SyncManager[A, B], peer: A): bool = + # `constructValidaCustodyPeers` returns a list of peers that are an overall + # superset of the given local node's custody columns and the custody columns + # of the remote peers + + # Get the local custody subnet count + var localCustodySubnetCount: uint64 + if man.supernode: + localCustodySubnetCount = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64 + else: + localCustodySubnetCount = CUSTODY_REQUIREMENT + + # Get the local custody columns + let + localNodeId = peer.getNodeIdFromPeer() + localCustodyColumns = + localNodeId.get_custody_columns(localCustodySubnetCount).get + + var validPeerIds: seq[PeerId] + + + # Get the custody subnet counts of the remote peer + let remoteCustodySubnetCount = + peer.fetchCustodyColumnCountFromRemotePeer() + + # Extract remote peer's nodeID from peerID + # Fetch custody columns fromm remote peer + let + remoteNodeId = peer.getNodeIdFromPeer() + remoteCustodyColumns = + remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get + + # If the remote peer custodies less columns than our local node + # We skip it + if remoteCustodyColumns.len < localCustodyColumns.len: + return false + + # If the remote peer custodies all the possible columns + if remoteCustodyColumns.len == NUMBER_OF_COLUMNS: + return true + + # Filtering out the invalid peers + for column in localCustodyColumns: + if column notin remoteCustodyColumns: + return true + + # Otherwise add the peer to the set + # of valid peerIds + # validPeerIds.add(peer.peerId) + # validPeerIds + return true + proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async: (raises: [CancelledError]).} = logScope: @@ -548,51 +629,54 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) withBlck(blck[]): when consensusFork >= ConsensusFork.Deneb: if forkyBlck.message.body.blob_kzg_commitments.len > 0: - hasColumns = true + hasColumns = true break hasColumns let dataColumnData = if shouldGetDataColumns: - let data_columns = await man.getDataColumnSidecars(peer, req) - if data_columns.isErr(): - # peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive data columns on request", - request = req, err = data_columns.error - return - let dataColumnData = data_columns.get().asSeq() - let dataColumnSmap = getShortMap(req, dataColumnData) - debug "Received data columns on request", data_columns_count = len(dataColumnData), - data_columns_map = dataColumnSmap, request = req + if man.checkValidPeerCustody(peer): + let data_columns = await man.getDataColumnSidecars(peer, req) + if data_columns.isErr(): + # peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive data columns on request", + request = req, err = data_columns.error + return + let dataColumnData = data_columns.get().asSeq() + let dataColumnSmap = getShortMap(req, dataColumnData) + debug "Received data columns on request", data_columns_count = len(dataColumnData), + data_columns_map = dataColumnSmap, request = req - if len(dataColumnData) > 0: - let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot) - let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) - if not(checkResponse(req, uniqueSlots)): + if len(dataColumnData) > 0: + let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot) + let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + if not(checkResponse(req, uniqueSlots)): + # peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received data columns sequence is not in requested range", + data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData), + request = req + return + let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData) + if groupedDataColumns.isErr(): + # peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + # warn "Received data columns is inconsistent", + # data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error() + return + if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr): # peer.updateScore(PeerScoreBadResponse) man.queue.push(req) - warn "Received data columns sequence is not in requested range", - data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData), - request = req + warn "Received data columns is invalid", + data_columns_count = len(dataColumnData), + data_columns_map = getShortMap(req, dataColumnData), + request = req, + msg = checkRes.error return - let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData) - if groupedDataColumns.isErr(): - # peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - # warn "Received data columns is inconsistent", - # data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error() - return - if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr): - # peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received data columns is invalid", - data_columns_count = len(dataColumnData), - data_columns_map = getShortMap(req, dataColumnData), - request = req, - msg = checkRes.error - return - Opt.some(groupedDataColumns.get()) + Opt.some(groupedDataColumns.get()) + else: + Opt.none(seq[DataColumnSidecars]) else: Opt.none(seq[DataColumnSidecars])