From 9dacfed943ac259218c9dd94b855a432f84b4999 Mon Sep 17 00:00:00 2001 From: andri lim Date: Tue, 6 Aug 2024 11:26:55 +0700 Subject: [PATCH] Disable txpool in eth wire protocol handler (#2540) --- nimbus/beacon/api_handler/api_getbodies.nim | 2 +- nimbus/rpc/experimental.nim | 2 +- nimbus/sync/handlers/eth.nim | 457 +++++++++----------- nimbus/sync/handlers/setup.nim | 11 - 4 files changed, 200 insertions(+), 272 deletions(-) diff --git a/nimbus/beacon/api_handler/api_getbodies.nim b/nimbus/beacon/api_handler/api_getbodies.nim index d38495476..407f88a76 100644 --- a/nimbus/beacon/api_handler/api_getbodies.nim +++ b/nimbus/beacon/api_handler/api_getbodies.nim @@ -23,7 +23,7 @@ const proc getPayloadBodyByHeader(db: CoreDbRef, header: common.BlockHeader, - output: var seq[Opt[ExecutionPayloadBodyV1]]) = + output: var seq[Opt[ExecutionPayloadBodyV1]]) {.gcsafe, raises:[].} = var body: common.BlockBody if not db.getBlockBody(header, body): diff --git a/nimbus/rpc/experimental.nim b/nimbus/rpc/experimental.nim index 280c33db8..27926ca77 100644 --- a/nimbus/rpc/experimental.nim +++ b/nimbus/rpc/experimental.nim @@ -35,7 +35,7 @@ proc getMultiKeys*( com: CommonRef, blockHeader: BlockHeader, statePostExecution: bool): MultiKeysRef - {.raises: [RlpError, BlockNotFound, ValueError].} = + {.raises: [BlockNotFound, ValueError].} = let chainDB = com.db diff --git a/nimbus/sync/handlers/eth.nim b/nimbus/sync/handlers/eth.nim index 0429d5107..ba3d74ea2 100644 --- a/nimbus/sync/handlers/eth.nim +++ b/nimbus/sync/handlers/eth.nim @@ -11,15 +11,14 @@ {.push raises: [].} import - std/[tables, times, hashes, sets, sequtils], + std/[tables, times, hashes, sets], chronicles, chronos, stew/endians2, eth/p2p, eth/p2p/peer_pool, ".."/[types, protocol], ../protocol/eth/eth_types, - ../protocol/trace_config, # gossip noise control - ../../core/[chain, tx_pool, tx_pool/tx_item] + ../../core/[chain, tx_pool] logScope: topics = "eth-wire" @@ -27,47 +26,22 @@ logScope: type HashToTime = TableRef[Hash256, Time] - NewBlockHandler* = proc( - arg: pointer, - peer: Peer, - blk: EthBlock, - totalDifficulty: DifficultyInt) {. - gcsafe, raises: [CatchableError].} - - NewBlockHashesHandler* = proc( - arg: pointer, - peer: Peer, - hashes: openArray[NewBlockHashesAnnounce]) {. - gcsafe, raises: [CatchableError].} - - NewBlockHandlerPair = object - arg: pointer - handler: NewBlockHandler - - NewBlockHashesHandlerPair = object - arg: pointer - handler: NewBlockHashesHandler - - EthWireRunState = enum - Enabled - Suspended - NotAvailable - EthWireRef* = ref object of EthWireBase db: CoreDbRef chain: ChainRef txPool: TxPoolRef peerPool: PeerPool - enableTxPool: EthWireRunState knownByPeer: Table[Peer, HashToTime] pending: HashSet[Hash256] lastCleanup: Time - newBlockHandler: NewBlockHandlerPair - newBlockHashesHandler: NewBlockHashesHandlerPair const - NUM_PEERS_REBROADCAST_QUOTIENT = 4 - POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) + txpool_enabled = defined(enable_txpool_in_synchronizer) + +when txpool_enabled: + const + NUM_PEERS_REBROADCAST_QUOTIENT = 4 + POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) # ------------------------------------------------------------------------------ # Private functions: helper functions @@ -110,169 +84,169 @@ proc blockHeader(db: CoreDbRef, proc hash(peer: Peer): hashes.Hash {.used.} = hash(peer.remote) -proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] = - # do not send back tx or txhash to thisPeer - for peer in peers(ctx.peerPool): - if peer != thisPeer: - result.add peer +when txpool_enabled: + proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] = + # do not send back tx or txhash to thisPeer + for peer in peers(ctx.peerPool): + if peer != thisPeer: + result.add peer -proc cleanupKnownByPeer(ctx: EthWireRef) = - let now = getTime() - var tmp = HashSet[Hash256]() - for _, map in ctx.knownByPeer: - for hash, time in map: - if time - now >= POOLED_STORAGE_TIME_LIMIT: - tmp.incl hash - for hash in tmp: - map.del(hash) - tmp.clear() + proc cleanupKnownByPeer(ctx: EthWireRef) = + let now = getTime() + var tmp = HashSet[Hash256]() + for _, map in ctx.knownByPeer: + for hash, time in map: + if time - now >= POOLED_STORAGE_TIME_LIMIT: + tmp.incl hash + for hash in tmp: + map.del(hash) + tmp.clear() - var tmpPeer = HashSet[Peer]() - for peer, map in ctx.knownByPeer: - if map.len == 0: - tmpPeer.incl peer + var tmpPeer = HashSet[Peer]() + for peer, map in ctx.knownByPeer: + if map.len == 0: + tmpPeer.incl peer - for peer in tmpPeer: - ctx.knownByPeer.del peer + for peer in tmpPeer: + ctx.knownByPeer.del peer - ctx.lastCleanup = now + ctx.lastCleanup = now -proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) = - var map: HashToTime - ctx.knownByPeer.withValue(peer, val) do: - map = val[] - do: - map = newTable[Hash256, Time]() - ctx.knownByPeer[peer] = map + proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) = + var map: HashToTime + ctx.knownByPeer.withValue(peer, val) do: + map = val[] + do: + map = newTable[Hash256, Time]() + ctx.knownByPeer[peer] = map - for txHash in txHashes: - if txHash notin map: - map[txHash] = getTime() + for txHash in txHashes: + if txHash notin map: + map[txHash] = getTime() -proc addToKnownByPeer(ctx: EthWireRef, - txHashes: openArray[Hash256], - peer: Peer, - newHashes: var seq[Hash256]) = - var map: HashToTime - ctx.knownByPeer.withValue(peer, val) do: - map = val[] - do: - map = newTable[Hash256, Time]() - ctx.knownByPeer[peer] = map + proc addToKnownByPeer(ctx: EthWireRef, + txHashes: openArray[Hash256], + peer: Peer, + newHashes: var seq[Hash256]) = + var map: HashToTime + ctx.knownByPeer.withValue(peer, val) do: + map = val[] + do: + map = newTable[Hash256, Time]() + ctx.knownByPeer[peer] = map - newHashes = newSeqOfCap[Hash256](txHashes.len) - for txHash in txHashes: - if txHash notin map: - map[txHash] = getTime() - newHashes.add txHash + newHashes = newSeqOfCap[Hash256](txHashes.len) + for txHash in txHashes: + if txHash notin map: + map[txHash] = getTime() + newHashes.add txHash # ------------------------------------------------------------------------------ # Private functions: async workers # ------------------------------------------------------------------------------ -proc sendNewTxHashes(ctx: EthWireRef, - txHashes: seq[Hash256], - peers: seq[Peer]): Future[void] {.async.} = - try: - for peer in peers: - # Add to known tx hashes and get hashes still to send to peer - var hashesToSend: seq[Hash256] - ctx.addToKnownByPeer(txHashes, peer, hashesToSend) - - # Broadcast to peer if at least 1 new tx hash to announce - if hashesToSend.len > 0: - # Currently only one protocol version is available as compiled - when ethVersion == 68: - await newPooledTransactionHashes( - peer, - 1u8.repeat hashesToSend.len, # type - 0.repeat hashesToSend.len, # sizes - hashesToSend) - else: - await newPooledTransactionHashes(peer, hashesToSend) - - except TransportError: - debug "Transport got closed during sendNewTxHashes" - except CatchableError as e: - debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg - -proc sendTransactions(ctx: EthWireRef, +when txpool_enabled: + proc sendNewTxHashes(ctx: EthWireRef, txHashes: seq[Hash256], - txs: seq[Transaction], peers: seq[Peer]): Future[void] {.async.} = - try: - for peer in peers: - # This is used to avoid re-sending along pooledTxHashes - # announcements/re-broadcasts - ctx.addToKnownByPeer(txHashes, peer) - await peer.transactions(txs) + try: + for peer in peers: + # Add to known tx hashes and get hashes still to send to peer + var hashesToSend: seq[Hash256] + ctx.addToKnownByPeer(txHashes, peer, hashesToSend) - except TransportError: - debug "Transport got closed during sendTransactions" - except CatchableError as e: - debug "Exception in sendTransactions", exc = e.name, err = e.msg + # Broadcast to peer if at least 1 new tx hash to announce + if hashesToSend.len > 0: + # Currently only one protocol version is available as compiled + when ethVersion == 68: + await newPooledTransactionHashes( + peer, + 1u8.repeat hashesToSend.len, # type + 0.repeat hashesToSend.len, # sizes + hashesToSend) + else: + await newPooledTransactionHashes(peer, hashesToSend) -proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} = - debug "fetchTx: requesting txs", - number = reqHashes.len + except TransportError: + debug "Transport got closed during sendNewTxHashes" + except CatchableError as e: + debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg - try: + proc sendTransactions(ctx: EthWireRef, + txHashes: seq[Hash256], + txs: seq[Transaction], + peers: seq[Peer]): Future[void] {.async.} = + try: + for peer in peers: + # This is used to avoid re-sending along pooledTxHashes + # announcements/re-broadcasts + ctx.addToKnownByPeer(txHashes, peer) + await peer.transactions(txs) - let res = await peer.getPooledTransactions(reqHashes) - if res.isNone: - error "not able to get pooled transactions" + except TransportError: + debug "Transport got closed during sendTransactions" + except CatchableError as e: + debug "Exception in sendTransactions", exc = e.name, err = e.msg + + proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} = + debug "fetchTx: requesting txs", + number = reqHashes.len + + try: + + let res = await peer.getPooledTransactions(reqHashes) + if res.isNone: + error "not able to get pooled transactions" + return + + let txs = res.get() + debug "fetchTx: received requested txs", + number = txs.transactions.len + + # Remove from pending list regardless if tx is in result + for tx in txs.transactions: + let txHash = rlpHash(tx) + ctx.pending.excl txHash + + ctx.txPool.add(txs.transactions) + + except TransportError: + debug "Transport got closed during fetchTransactions" + return + except CatchableError as e: + debug "Exception in fetchTransactions", exc = e.name, err = e.msg return - let txs = res.get() - debug "fetchTx: received requested txs", - number = txs.transactions.len + var newTxHashes = newSeqOfCap[Hash256](reqHashes.len) + for txHash in reqHashes: + if ctx.txPool.inPoolAndOk(txHash): + newTxHashes.add txHash - # Remove from pending list regardless if tx is in result - for tx in txs.transactions: - let txHash = rlpHash(tx) - ctx.pending.excl txHash + let peers = ctx.getPeers(peer) + if peers.len == 0 or newTxHashes.len == 0: + return - ctx.txPool.add(txs.transactions) - - except TransportError: - debug "Transport got closed during fetchTransactions" - return - except CatchableError as e: - debug "Exception in fetchTransactions", exc = e.name, err = e.msg - return - - var newTxHashes = newSeqOfCap[Hash256](reqHashes.len) - for txHash in reqHashes: - if ctx.txPool.inPoolAndOk(txHash): - newTxHashes.add txHash - - let peers = ctx.getPeers(peer) - if peers.len == 0 or newTxHashes.len == 0: - return - - await ctx.sendNewTxHashes(newTxHashes, peers) + await ctx.sendNewTxHashes(newTxHashes, peers) # ------------------------------------------------------------------------------ # Private functions: peer observer # ------------------------------------------------------------------------------ proc onPeerConnected(ctx: EthWireRef, peer: Peer) = - if ctx.enableTxPool != Enabled: - when trMissingOrDisabledGossipOk: - notEnabled("onPeerConnected") - return + when txpool_enabled: + var txHashes = newSeqOfCap[Hash256](ctx.txPool.numTxs) + for txHash, item in okPairs(ctx.txPool): + txHashes.add txHash - var txHashes = newSeqOfCap[Hash256](ctx.txPool.numTxs) - for txHash, item in okPairs(ctx.txPool): - txHashes.add txHash + if txHashes.len == 0: + return - if txHashes.len == 0: - return + debug "announce tx hashes to newly connected peer", + number = txHashes.len - debug "announce tx hashes to newly connected peer", - number = txHashes.len - - asyncSpawn ctx.sendNewTxHashes(txHashes, @[peer]) + asyncSpawn ctx.sendNewTxHashes(txHashes, @[peer]) + else: + discard proc onPeerDisconnected(ctx: EthWireRef, peer: Peer) = debug "remove peer from knownByPeer", @@ -304,43 +278,11 @@ proc new*(_: type EthWireRef, chain: chain, txPool: txPool, peerPool: peerPool, - enableTxPool: Enabled, lastCleanup: getTime()) - if txPool.isNil: - ctx.enableTxPool = NotAvailable - when trMissingOrDisabledGossipOk: - trace "New eth handler, minimal/outbound support only" ctx.setupPeerObserver() ctx -# ------------------------------------------------------------------------------ -# Public functions: callbacks setters -# ------------------------------------------------------------------------------ - -proc setNewBlockHandler*(ctx: EthWireRef, handler: NewBlockHandler, arg: pointer) = - ctx.newBlockHandler = NewBlockHandlerPair( - arg: arg, - handler: handler - ) - -proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler, arg: pointer) = - ctx.newBlockHashesHandler = NewBlockHashesHandlerPair( - arg: arg, - handler: handler - ) - -# ------------------------------------------------------------------------------ -# Public getters/setters -# ------------------------------------------------------------------------------ - -proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) {.gcsafe, raises: [].} = - if ctx.enableTxPool != NotAvailable: - ctx.enableTxPool = if ena: Enabled else: Suspended - -proc txPoolEnabled*(ctx: EthWireRef): bool {.gcsafe, raises: [].} = - ctx.enableTxPool == Enabled - # ------------------------------------------------------------------------------ # Public functions: eth wire protocol handlers # ------------------------------------------------------------------------------ @@ -391,15 +333,19 @@ method getPooledTxs*(ctx: EthWireRef, hashes: openArray[Hash256]): Result[seq[PooledTransaction], string] {.gcsafe.} = - let txPool = ctx.txPool - var list: seq[PooledTransaction] - for txHash in hashes: - let res = txPool.getItem(txHash) - if res.isOk: - list.add res.value.pooledTx - else: - trace "handlers.getPooledTxs: tx not found", txHash - ok(list) + + when txpool_enabled: + let txPool = ctx.txPool + var list: seq[PooledTransaction] + for txHash in hashes: + let res = txPool.getItem(txHash) + if res.isOk: + list.add res.value.pooledTx + else: + trace "handlers.getPooledTxs: tx not found", txHash + ok(list) + else: + err("txpool disabled") method getBlockBodies*(ctx: EthWireRef, hashes: openArray[Hash256]): @@ -446,55 +392,53 @@ method handleAnnouncedTxs*(ctx: EthWireRef, Result[void, string] {.gcsafe.} = - try: - if ctx.enableTxPool != Enabled: - when trMissingOrDisabledGossipOk: - notEnabled("handleAnnouncedTxs") + when txpool_enabled: + try: + if txs.len == 0: + return ok() + + debug "received new transactions", + number = txs.len + + if ctx.lastCleanup - getTime() > POOLED_STORAGE_TIME_LIMIT: + ctx.cleanupKnownByPeer() + + var txHashes = newSeqOfCap[Hash256](txs.len) + for tx in txs: + txHashes.add rlpHash(tx) + + ctx.addToKnownByPeer(txHashes, peer) + for tx in txs: + if tx.versionedHashes.len > 0: + # EIP-4844 blobs are not persisted and cannot be broadcasted + continue + ctx.txPool.add PooledTransaction(tx: tx) + + var newTxHashes = newSeqOfCap[Hash256](txHashes.len) + var validTxs = newSeqOfCap[Transaction](txHashes.len) + for i, txHash in txHashes: + # Nodes must not automatically broadcast blob transactions to + # their peers. per EIP-4844 spec + if ctx.txPool.inPoolAndOk(txHash) and txs[i].txType != TxEip4844: + newTxHashes.add txHash + validTxs.add txs[i] + + let + peers = ctx.getPeers(peer) + numPeers = peers.len + sendFull = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT) + + if numPeers == 0 or validTxs.len == 0: + return ok() + + asyncSpawn ctx.sendTransactions(txHashes, validTxs, peers[0.. POOLED_STORAGE_TIME_LIMIT: - ctx.cleanupKnownByPeer() - - var txHashes = newSeqOfCap[Hash256](txs.len) - for tx in txs: - txHashes.add rlpHash(tx) - - ctx.addToKnownByPeer(txHashes, peer) - for tx in txs: - if tx.versionedHashes.len > 0: - # EIP-4844 blobs are not persisted and cannot be broadcasted - continue - ctx.txPool.add PooledTransaction(tx: tx) - - var newTxHashes = newSeqOfCap[Hash256](txHashes.len) - var validTxs = newSeqOfCap[Transaction](txHashes.len) - for i, txHash in txHashes: - # Nodes must not automatically broadcast blob transactions to - # their peers. per EIP-4844 spec - if ctx.txPool.inPoolAndOk(txHash) and txs[i].txType != TxEip4844: - newTxHashes.add txHash - validTxs.add txs[i] - - let - peers = ctx.getPeers(peer) - numPeers = peers.len - sendFull = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT) - - if numPeers == 0 or validTxs.len == 0: - return ok() - - asyncSpawn ctx.sendTransactions(txHashes, validTxs, peers[0..