refactor: sync manager to range request only valid peers if not supernode
This commit is contained in:
parent
fa5b15436c
commit
5265eeb6ea
|
@ -28,7 +28,7 @@ import
|
||||||
".."/[version, conf, beacon_clock, conf_light_client],
|
".."/[version, conf, beacon_clock, conf_light_client],
|
||||||
../spec/datatypes/[phase0, altair, bellatrix, eip7594],
|
../spec/datatypes/[phase0, altair, bellatrix, eip7594],
|
||||||
../spec/[eth2_ssz_serialization, network,
|
../spec/[eth2_ssz_serialization, network,
|
||||||
helpers, eip7594_helpers, forks],
|
helpers, forks],
|
||||||
../validators/keystore_management,
|
../validators/keystore_management,
|
||||||
"."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores]
|
"."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores]
|
||||||
|
|
||||||
|
@ -1634,93 +1634,11 @@ proc getNodeIdFromPeer*(peer: Peer): NodeId=
|
||||||
# Convert peer id to node id by extracting the peer's public key
|
# Convert peer id to node id by extracting the peer's public key
|
||||||
let nodeId =
|
let nodeId =
|
||||||
block:
|
block:
|
||||||
var key: PublicKey
|
var key: eth2_network.PublicKey
|
||||||
discard peer.peerId.extractPublicKey(key)
|
discard peer.peerId.extractPublicKey(key)
|
||||||
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
|
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
|
||||||
nodeId
|
nodeId
|
||||||
|
|
||||||
proc fetchCustodyColumnCountFromRemotePeer*(node: Eth2Node,
|
|
||||||
pid: PeerId):
|
|
||||||
uint64 =
|
|
||||||
# Fetches the custody column count from a remote peer
|
|
||||||
# if the peer advertises their custody column count
|
|
||||||
# via the `csc` ENR field. If the peer does NOT, then
|
|
||||||
# the default value is assume, i.e, CUSTODY_REQUIREMENT
|
|
||||||
let
|
|
||||||
eth2node = node
|
|
||||||
peer = eth2node.getPeer(pid)
|
|
||||||
|
|
||||||
let enrOpt = peer.enr
|
|
||||||
if enrOpt.isNone:
|
|
||||||
debug "Could not get ENR from peer",
|
|
||||||
peer_id = pid
|
|
||||||
return 0
|
|
||||||
|
|
||||||
else:
|
|
||||||
let
|
|
||||||
enr = enrOpt.get
|
|
||||||
enrFieldOpt =
|
|
||||||
enr.get(enrCustodySubnetCountField, uint64)
|
|
||||||
|
|
||||||
if not enrFieldOpt.isOk:
|
|
||||||
debug "Issue with fetching `csc` field from ENR",
|
|
||||||
enr = enr
|
|
||||||
else:
|
|
||||||
return(enrFieldOpt.get)
|
|
||||||
|
|
||||||
proc constructValidCustodyPeers*(node: Eth2Node,
|
|
||||||
peers: openArray[Peer],
|
|
||||||
config: BeaconNodeConf):
|
|
||||||
seq[PeerId] =
|
|
||||||
# `constructValidaCustodyPeers` returns a list of peers that are an overall
|
|
||||||
# superset of the given local node's custody columns and the custody columns
|
|
||||||
# of the remote peers
|
|
||||||
|
|
||||||
# Get the local custody subnet count
|
|
||||||
var localCustodySubnetCount: uint64
|
|
||||||
if config.subscribeAllSubnets:
|
|
||||||
localCustodySubnetCount = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
|
|
||||||
localCustodySubnetCount = CUSTODY_REQUIREMENT
|
|
||||||
|
|
||||||
# Get the local custody columns
|
|
||||||
let
|
|
||||||
localNodeId = node.nodeId
|
|
||||||
localCustodyColumns =
|
|
||||||
localNodeId.get_custody_columns(localCustodySubnetCount).get
|
|
||||||
|
|
||||||
var validPeerIds = newSeqOfCap[PeerId](peers.len)
|
|
||||||
|
|
||||||
for peer in peers:
|
|
||||||
# Get the custody subnet counts of the remote peer
|
|
||||||
let remoteCustodySubnetCount =
|
|
||||||
node.fetchCustodyColumnCountFromRemotePeer(peer.peerId)
|
|
||||||
|
|
||||||
# Extract remote peer's nodeID from peerID
|
|
||||||
# Fetch custody columns fromm remote peer
|
|
||||||
let
|
|
||||||
remoteNodeId = getNodeIdFromPeer(peer)
|
|
||||||
remoteCustodyColumns =
|
|
||||||
remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get
|
|
||||||
|
|
||||||
# If the remote peer custodies less columns than our local node
|
|
||||||
# We skip it
|
|
||||||
if remoteCustodyColumns.len < localCustodyColumns.len:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# If the remote peer custodies all the possible columns
|
|
||||||
if remoteCustodyColumns.len == NUMBER_OF_COLUMNS:
|
|
||||||
validPeerIds.add(peer.peerId)
|
|
||||||
|
|
||||||
# Filtering out the invalid peers
|
|
||||||
for column in localCustodyColumns:
|
|
||||||
if column notin remoteCustodyColumns:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Otherwise add the peer to the set
|
|
||||||
# of valid peerIds
|
|
||||||
validPeerIds.add(peer.peerId)
|
|
||||||
validPeerIds
|
|
||||||
|
|
||||||
proc resolvePeer(peer: Peer) =
|
proc resolvePeer(peer: Peer) =
|
||||||
# Resolve task which performs searching of peer's public key and recovery of
|
# Resolve task which performs searching of peer's public key and recovery of
|
||||||
# ENR using discovery5. We only resolve ENR for peers we know about to avoid
|
# ENR using discovery5. We only resolve ENR for peers we know about to avoid
|
||||||
|
|
|
@ -462,22 +462,25 @@ proc initFullNode(
|
||||||
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
|
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
|
||||||
lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine,
|
lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine,
|
||||||
rng, getBeaconTime, taskpool)
|
rng, getBeaconTime, taskpool)
|
||||||
|
router = (ref MessageRouter)(
|
||||||
|
processor: processor,
|
||||||
|
network: node.network)
|
||||||
|
|
||||||
|
var supernode = node.config.subscribeAllSubnets
|
||||||
|
let
|
||||||
syncManager = newSyncManager[Peer, PeerId](
|
syncManager = newSyncManager[Peer, PeerId](
|
||||||
node.network.peerPool,
|
node.network.peerPool,
|
||||||
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
|
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
|
||||||
SyncQueueKind.Forward, getLocalHeadSlot,
|
supernode, SyncQueueKind.Forward, getLocalHeadSlot,
|
||||||
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
||||||
getFrontfillSlot, dag.tail.slot, blockVerifier)
|
getFrontfillSlot, dag.tail.slot, blockVerifier)
|
||||||
backfiller = newSyncManager[Peer, PeerId](
|
backfiller = newSyncManager[Peer, PeerId](
|
||||||
node.network.peerPool,
|
node.network.peerPool,
|
||||||
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
|
dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS,
|
||||||
SyncQueueKind.Backward, getLocalHeadSlot,
|
supernode, SyncQueueKind.Backward, getLocalHeadSlot,
|
||||||
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
|
||||||
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
getFrontfillSlot, dag.backfill.slot, blockVerifier,
|
||||||
maxHeadAge = 0)
|
maxHeadAge = 0)
|
||||||
router = (ref MessageRouter)(
|
|
||||||
processor: processor,
|
|
||||||
network: node.network)
|
|
||||||
requestManager = RequestManager.init(
|
requestManager = RequestManager.init(
|
||||||
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
|
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
|
||||||
(proc(): bool = syncManager.inProgress),
|
(proc(): bool = syncManager.inProgress),
|
||||||
|
|
|
@ -52,7 +52,9 @@ type
|
||||||
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: uint64
|
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: uint64
|
||||||
MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS: uint64
|
MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS: uint64
|
||||||
responseTimeout: chronos.Duration
|
responseTimeout: chronos.Duration
|
||||||
|
supernode: bool
|
||||||
maxHeadAge: uint64
|
maxHeadAge: uint64
|
||||||
|
network: Eth2Node
|
||||||
getLocalHeadSlot: GetSlotCallback
|
getLocalHeadSlot: GetSlotCallback
|
||||||
getLocalWallSlot: GetSlotCallback
|
getLocalWallSlot: GetSlotCallback
|
||||||
getSafeSlot: GetSlotCallback
|
getSafeSlot: GetSlotCallback
|
||||||
|
@ -121,6 +123,7 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
|
||||||
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||||
denebEpoch: Epoch,
|
denebEpoch: Epoch,
|
||||||
minEpochsForBlobSidecarsRequests: uint64,
|
minEpochsForBlobSidecarsRequests: uint64,
|
||||||
|
supernode: bool,
|
||||||
direction: SyncQueueKind,
|
direction: SyncQueueKind,
|
||||||
getLocalHeadSlotCb: GetSlotCallback,
|
getLocalHeadSlotCb: GetSlotCallback,
|
||||||
getLocalWallSlotCb: GetSlotCallback,
|
getLocalWallSlotCb: GetSlotCallback,
|
||||||
|
@ -144,6 +147,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||||
pool: pool,
|
pool: pool,
|
||||||
DENEB_FORK_EPOCH: denebEpoch,
|
DENEB_FORK_EPOCH: denebEpoch,
|
||||||
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: minEpochsForBlobSidecarsRequests,
|
MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: minEpochsForBlobSidecarsRequests,
|
||||||
|
supernode: supernode,
|
||||||
getLocalHeadSlot: getLocalHeadSlotCb,
|
getLocalHeadSlot: getLocalHeadSlotCb,
|
||||||
getLocalWallSlot: getLocalWallSlotCb,
|
getLocalWallSlot: getLocalWallSlotCb,
|
||||||
getSafeSlot: getSafeSlot,
|
getSafeSlot: getSafeSlot,
|
||||||
|
@ -323,6 +327,83 @@ func checkDataColumns(data_columns: seq[DataColumnSidecars]): Result[void, strin
|
||||||
? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof()
|
? data_column_sidecar[].verify_data_column_sidecar_inclusion_proof()
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
proc fetchCustodyColumnCountFromRemotePeer*(peer: Peer):
|
||||||
|
uint64 =
|
||||||
|
# Fetches the custody column count from a remote peer
|
||||||
|
# if the peer advertises their custody column count
|
||||||
|
# via the `csc` ENR field. If the peer does NOT, then
|
||||||
|
# the default value is assume, i.e, CUSTODY_REQUIREMENT
|
||||||
|
|
||||||
|
let enrOpt = peer.enr
|
||||||
|
if enrOpt.isNone:
|
||||||
|
debug "Could not get ENR from peer",
|
||||||
|
peer_id = peer.peerId
|
||||||
|
return 0
|
||||||
|
|
||||||
|
else:
|
||||||
|
let
|
||||||
|
enr = enrOpt.get
|
||||||
|
enrFieldOpt =
|
||||||
|
enr.get(enrCustodySubnetCountField, uint64)
|
||||||
|
|
||||||
|
if not enrFieldOpt.isOk:
|
||||||
|
debug "Issue with fetching `csc` field from ENR",
|
||||||
|
enr = enr
|
||||||
|
else:
|
||||||
|
return(enrFieldOpt.get)
|
||||||
|
|
||||||
|
proc checkValidPeerCustody*[A, B](man: SyncManager[A, B], peer: A): bool =
|
||||||
|
# `constructValidaCustodyPeers` returns a list of peers that are an overall
|
||||||
|
# superset of the given local node's custody columns and the custody columns
|
||||||
|
# of the remote peers
|
||||||
|
|
||||||
|
# Get the local custody subnet count
|
||||||
|
var localCustodySubnetCount: uint64
|
||||||
|
if man.supernode:
|
||||||
|
localCustodySubnetCount = DATA_COLUMN_SIDECAR_SUBNET_COUNT.uint64
|
||||||
|
else:
|
||||||
|
localCustodySubnetCount = CUSTODY_REQUIREMENT
|
||||||
|
|
||||||
|
# Get the local custody columns
|
||||||
|
let
|
||||||
|
localNodeId = peer.getNodeIdFromPeer()
|
||||||
|
localCustodyColumns =
|
||||||
|
localNodeId.get_custody_columns(localCustodySubnetCount).get
|
||||||
|
|
||||||
|
var validPeerIds: seq[PeerId]
|
||||||
|
|
||||||
|
|
||||||
|
# Get the custody subnet counts of the remote peer
|
||||||
|
let remoteCustodySubnetCount =
|
||||||
|
peer.fetchCustodyColumnCountFromRemotePeer()
|
||||||
|
|
||||||
|
# Extract remote peer's nodeID from peerID
|
||||||
|
# Fetch custody columns fromm remote peer
|
||||||
|
let
|
||||||
|
remoteNodeId = peer.getNodeIdFromPeer()
|
||||||
|
remoteCustodyColumns =
|
||||||
|
remoteNodeId.get_custody_columns(remoteCustodySubnetCount).get
|
||||||
|
|
||||||
|
# If the remote peer custodies less columns than our local node
|
||||||
|
# We skip it
|
||||||
|
if remoteCustodyColumns.len < localCustodyColumns.len:
|
||||||
|
return false
|
||||||
|
|
||||||
|
# If the remote peer custodies all the possible columns
|
||||||
|
if remoteCustodyColumns.len == NUMBER_OF_COLUMNS:
|
||||||
|
return true
|
||||||
|
|
||||||
|
# Filtering out the invalid peers
|
||||||
|
for column in localCustodyColumns:
|
||||||
|
if column notin remoteCustodyColumns:
|
||||||
|
return true
|
||||||
|
|
||||||
|
# Otherwise add the peer to the set
|
||||||
|
# of valid peerIds
|
||||||
|
# validPeerIds.add(peer.peerId)
|
||||||
|
# validPeerIds
|
||||||
|
return true
|
||||||
|
|
||||||
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
||||||
{.async: (raises: [CancelledError]).} =
|
{.async: (raises: [CancelledError]).} =
|
||||||
logScope:
|
logScope:
|
||||||
|
@ -548,51 +629,54 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
||||||
withBlck(blck[]):
|
withBlck(blck[]):
|
||||||
when consensusFork >= ConsensusFork.Deneb:
|
when consensusFork >= ConsensusFork.Deneb:
|
||||||
if forkyBlck.message.body.blob_kzg_commitments.len > 0:
|
if forkyBlck.message.body.blob_kzg_commitments.len > 0:
|
||||||
hasColumns = true
|
hasColumns = true
|
||||||
break
|
break
|
||||||
hasColumns
|
hasColumns
|
||||||
|
|
||||||
let dataColumnData =
|
let dataColumnData =
|
||||||
if shouldGetDataColumns:
|
if shouldGetDataColumns:
|
||||||
let data_columns = await man.getDataColumnSidecars(peer, req)
|
if man.checkValidPeerCustody(peer):
|
||||||
if data_columns.isErr():
|
let data_columns = await man.getDataColumnSidecars(peer, req)
|
||||||
# peer.updateScore(PeerScoreNoValues)
|
if data_columns.isErr():
|
||||||
man.queue.push(req)
|
# peer.updateScore(PeerScoreNoValues)
|
||||||
debug "Failed to receive data columns on request",
|
man.queue.push(req)
|
||||||
request = req, err = data_columns.error
|
debug "Failed to receive data columns on request",
|
||||||
return
|
request = req, err = data_columns.error
|
||||||
let dataColumnData = data_columns.get().asSeq()
|
return
|
||||||
let dataColumnSmap = getShortMap(req, dataColumnData)
|
let dataColumnData = data_columns.get().asSeq()
|
||||||
debug "Received data columns on request", data_columns_count = len(dataColumnData),
|
let dataColumnSmap = getShortMap(req, dataColumnData)
|
||||||
data_columns_map = dataColumnSmap, request = req
|
debug "Received data columns on request", data_columns_count = len(dataColumnData),
|
||||||
|
data_columns_map = dataColumnSmap, request = req
|
||||||
|
|
||||||
if len(dataColumnData) > 0:
|
if len(dataColumnData) > 0:
|
||||||
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
|
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
|
||||||
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
||||||
if not(checkResponse(req, uniqueSlots)):
|
if not(checkResponse(req, uniqueSlots)):
|
||||||
|
# peer.updateScore(PeerScoreBadResponse)
|
||||||
|
man.queue.push(req)
|
||||||
|
warn "Received data columns sequence is not in requested range",
|
||||||
|
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
|
||||||
|
request = req
|
||||||
|
return
|
||||||
|
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
|
||||||
|
if groupedDataColumns.isErr():
|
||||||
|
# peer.updateScore(PeerScoreNoValues)
|
||||||
|
man.queue.push(req)
|
||||||
|
# warn "Received data columns is inconsistent",
|
||||||
|
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
|
||||||
|
return
|
||||||
|
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
|
||||||
# peer.updateScore(PeerScoreBadResponse)
|
# peer.updateScore(PeerScoreBadResponse)
|
||||||
man.queue.push(req)
|
man.queue.push(req)
|
||||||
warn "Received data columns sequence is not in requested range",
|
warn "Received data columns is invalid",
|
||||||
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
|
data_columns_count = len(dataColumnData),
|
||||||
request = req
|
data_columns_map = getShortMap(req, dataColumnData),
|
||||||
|
request = req,
|
||||||
|
msg = checkRes.error
|
||||||
return
|
return
|
||||||
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
|
Opt.some(groupedDataColumns.get())
|
||||||
if groupedDataColumns.isErr():
|
else:
|
||||||
# peer.updateScore(PeerScoreNoValues)
|
Opt.none(seq[DataColumnSidecars])
|
||||||
man.queue.push(req)
|
|
||||||
# warn "Received data columns is inconsistent",
|
|
||||||
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
|
|
||||||
return
|
|
||||||
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
|
|
||||||
# peer.updateScore(PeerScoreBadResponse)
|
|
||||||
man.queue.push(req)
|
|
||||||
warn "Received data columns is invalid",
|
|
||||||
data_columns_count = len(dataColumnData),
|
|
||||||
data_columns_map = getShortMap(req, dataColumnData),
|
|
||||||
request = req,
|
|
||||||
msg = checkRes.error
|
|
||||||
return
|
|
||||||
Opt.some(groupedDataColumns.get())
|
|
||||||
else:
|
else:
|
||||||
Opt.none(seq[DataColumnSidecars])
|
Opt.none(seq[DataColumnSidecars])
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue