Disable txpool in eth wire protocol handler (#2540)

This commit is contained in:
andri lim 2024-08-06 11:26:55 +07:00 committed by GitHub
parent 01b5c08763
commit 9dacfed943
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 200 additions and 272 deletions

View File

@ -23,7 +23,7 @@ const
proc getPayloadBodyByHeader(db: CoreDbRef, proc getPayloadBodyByHeader(db: CoreDbRef,
header: common.BlockHeader, header: common.BlockHeader,
output: var seq[Opt[ExecutionPayloadBodyV1]]) = output: var seq[Opt[ExecutionPayloadBodyV1]]) {.gcsafe, raises:[].} =
var body: common.BlockBody var body: common.BlockBody
if not db.getBlockBody(header, body): if not db.getBlockBody(header, body):

View File

@ -35,7 +35,7 @@ proc getMultiKeys*(
com: CommonRef, com: CommonRef,
blockHeader: BlockHeader, blockHeader: BlockHeader,
statePostExecution: bool): MultiKeysRef statePostExecution: bool): MultiKeysRef
{.raises: [RlpError, BlockNotFound, ValueError].} = {.raises: [BlockNotFound, ValueError].} =
let let
chainDB = com.db chainDB = com.db

View File

@ -11,15 +11,14 @@
{.push raises: [].} {.push raises: [].}
import import
std/[tables, times, hashes, sets, sequtils], std/[tables, times, hashes, sets],
chronicles, chronos, chronicles, chronos,
stew/endians2, stew/endians2,
eth/p2p, eth/p2p,
eth/p2p/peer_pool, eth/p2p/peer_pool,
".."/[types, protocol], ".."/[types, protocol],
../protocol/eth/eth_types, ../protocol/eth/eth_types,
../protocol/trace_config, # gossip noise control ../../core/[chain, tx_pool]
../../core/[chain, tx_pool, tx_pool/tx_item]
logScope: logScope:
topics = "eth-wire" topics = "eth-wire"
@ -27,47 +26,22 @@ logScope:
type type
HashToTime = TableRef[Hash256, Time] HashToTime = TableRef[Hash256, Time]
NewBlockHandler* = proc(
arg: pointer,
peer: Peer,
blk: EthBlock,
totalDifficulty: DifficultyInt) {.
gcsafe, raises: [CatchableError].}
NewBlockHashesHandler* = proc(
arg: pointer,
peer: Peer,
hashes: openArray[NewBlockHashesAnnounce]) {.
gcsafe, raises: [CatchableError].}
NewBlockHandlerPair = object
arg: pointer
handler: NewBlockHandler
NewBlockHashesHandlerPair = object
arg: pointer
handler: NewBlockHashesHandler
EthWireRunState = enum
Enabled
Suspended
NotAvailable
EthWireRef* = ref object of EthWireBase EthWireRef* = ref object of EthWireBase
db: CoreDbRef db: CoreDbRef
chain: ChainRef chain: ChainRef
txPool: TxPoolRef txPool: TxPoolRef
peerPool: PeerPool peerPool: PeerPool
enableTxPool: EthWireRunState
knownByPeer: Table[Peer, HashToTime] knownByPeer: Table[Peer, HashToTime]
pending: HashSet[Hash256] pending: HashSet[Hash256]
lastCleanup: Time lastCleanup: Time
newBlockHandler: NewBlockHandlerPair
newBlockHashesHandler: NewBlockHashesHandlerPair
const const
NUM_PEERS_REBROADCAST_QUOTIENT = 4 txpool_enabled = defined(enable_txpool_in_synchronizer)
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
when txpool_enabled:
const
NUM_PEERS_REBROADCAST_QUOTIENT = 4
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions: helper functions # Private functions: helper functions
@ -110,169 +84,169 @@ proc blockHeader(db: CoreDbRef,
proc hash(peer: Peer): hashes.Hash {.used.} = proc hash(peer: Peer): hashes.Hash {.used.} =
hash(peer.remote) hash(peer.remote)
proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] = when txpool_enabled:
# do not send back tx or txhash to thisPeer proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] =
for peer in peers(ctx.peerPool): # do not send back tx or txhash to thisPeer
if peer != thisPeer: for peer in peers(ctx.peerPool):
result.add peer if peer != thisPeer:
result.add peer
proc cleanupKnownByPeer(ctx: EthWireRef) = proc cleanupKnownByPeer(ctx: EthWireRef) =
let now = getTime() let now = getTime()
var tmp = HashSet[Hash256]() var tmp = HashSet[Hash256]()
for _, map in ctx.knownByPeer: for _, map in ctx.knownByPeer:
for hash, time in map: for hash, time in map:
if time - now >= POOLED_STORAGE_TIME_LIMIT: if time - now >= POOLED_STORAGE_TIME_LIMIT:
tmp.incl hash tmp.incl hash
for hash in tmp: for hash in tmp:
map.del(hash) map.del(hash)
tmp.clear() tmp.clear()
var tmpPeer = HashSet[Peer]() var tmpPeer = HashSet[Peer]()
for peer, map in ctx.knownByPeer: for peer, map in ctx.knownByPeer:
if map.len == 0: if map.len == 0:
tmpPeer.incl peer tmpPeer.incl peer
for peer in tmpPeer: for peer in tmpPeer:
ctx.knownByPeer.del peer ctx.knownByPeer.del peer
ctx.lastCleanup = now ctx.lastCleanup = now
proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) = proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) =
var map: HashToTime var map: HashToTime
ctx.knownByPeer.withValue(peer, val) do: ctx.knownByPeer.withValue(peer, val) do:
map = val[] map = val[]
do: do:
map = newTable[Hash256, Time]() map = newTable[Hash256, Time]()
ctx.knownByPeer[peer] = map ctx.knownByPeer[peer] = map
for txHash in txHashes: for txHash in txHashes:
if txHash notin map: if txHash notin map:
map[txHash] = getTime() map[txHash] = getTime()
proc addToKnownByPeer(ctx: EthWireRef, proc addToKnownByPeer(ctx: EthWireRef,
txHashes: openArray[Hash256], txHashes: openArray[Hash256],
peer: Peer, peer: Peer,
newHashes: var seq[Hash256]) = newHashes: var seq[Hash256]) =
var map: HashToTime var map: HashToTime
ctx.knownByPeer.withValue(peer, val) do: ctx.knownByPeer.withValue(peer, val) do:
map = val[] map = val[]
do: do:
map = newTable[Hash256, Time]() map = newTable[Hash256, Time]()
ctx.knownByPeer[peer] = map ctx.knownByPeer[peer] = map
newHashes = newSeqOfCap[Hash256](txHashes.len) newHashes = newSeqOfCap[Hash256](txHashes.len)
for txHash in txHashes: for txHash in txHashes:
if txHash notin map: if txHash notin map:
map[txHash] = getTime() map[txHash] = getTime()
newHashes.add txHash newHashes.add txHash
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions: async workers # Private functions: async workers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc sendNewTxHashes(ctx: EthWireRef, when txpool_enabled:
txHashes: seq[Hash256], proc sendNewTxHashes(ctx: EthWireRef,
peers: seq[Peer]): Future[void] {.async.} =
try:
for peer in peers:
# Add to known tx hashes and get hashes still to send to peer
var hashesToSend: seq[Hash256]
ctx.addToKnownByPeer(txHashes, peer, hashesToSend)
# Broadcast to peer if at least 1 new tx hash to announce
if hashesToSend.len > 0:
# Currently only one protocol version is available as compiled
when ethVersion == 68:
await newPooledTransactionHashes(
peer,
1u8.repeat hashesToSend.len, # type
0.repeat hashesToSend.len, # sizes
hashesToSend)
else:
await newPooledTransactionHashes(peer, hashesToSend)
except TransportError:
debug "Transport got closed during sendNewTxHashes"
except CatchableError as e:
debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg
proc sendTransactions(ctx: EthWireRef,
txHashes: seq[Hash256], txHashes: seq[Hash256],
txs: seq[Transaction],
peers: seq[Peer]): Future[void] {.async.} = peers: seq[Peer]): Future[void] {.async.} =
try: try:
for peer in peers: for peer in peers:
# This is used to avoid re-sending along pooledTxHashes # Add to known tx hashes and get hashes still to send to peer
# announcements/re-broadcasts var hashesToSend: seq[Hash256]
ctx.addToKnownByPeer(txHashes, peer) ctx.addToKnownByPeer(txHashes, peer, hashesToSend)
await peer.transactions(txs)
except TransportError: # Broadcast to peer if at least 1 new tx hash to announce
debug "Transport got closed during sendTransactions" if hashesToSend.len > 0:
except CatchableError as e: # Currently only one protocol version is available as compiled
debug "Exception in sendTransactions", exc = e.name, err = e.msg when ethVersion == 68:
await newPooledTransactionHashes(
peer,
1u8.repeat hashesToSend.len, # type
0.repeat hashesToSend.len, # sizes
hashesToSend)
else:
await newPooledTransactionHashes(peer, hashesToSend)
proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} = except TransportError:
debug "fetchTx: requesting txs", debug "Transport got closed during sendNewTxHashes"
number = reqHashes.len except CatchableError as e:
debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg
try: proc sendTransactions(ctx: EthWireRef,
txHashes: seq[Hash256],
txs: seq[Transaction],
peers: seq[Peer]): Future[void] {.async.} =
try:
for peer in peers:
# This is used to avoid re-sending along pooledTxHashes
# announcements/re-broadcasts
ctx.addToKnownByPeer(txHashes, peer)
await peer.transactions(txs)
let res = await peer.getPooledTransactions(reqHashes) except TransportError:
if res.isNone: debug "Transport got closed during sendTransactions"
error "not able to get pooled transactions" except CatchableError as e:
debug "Exception in sendTransactions", exc = e.name, err = e.msg
proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} =
debug "fetchTx: requesting txs",
number = reqHashes.len
try:
let res = await peer.getPooledTransactions(reqHashes)
if res.isNone:
error "not able to get pooled transactions"
return
let txs = res.get()
debug "fetchTx: received requested txs",
number = txs.transactions.len
# Remove from pending list regardless if tx is in result
for tx in txs.transactions:
let txHash = rlpHash(tx)
ctx.pending.excl txHash
ctx.txPool.add(txs.transactions)
except TransportError:
debug "Transport got closed during fetchTransactions"
return
except CatchableError as e:
debug "Exception in fetchTransactions", exc = e.name, err = e.msg
return return
let txs = res.get() var newTxHashes = newSeqOfCap[Hash256](reqHashes.len)
debug "fetchTx: received requested txs", for txHash in reqHashes:
number = txs.transactions.len if ctx.txPool.inPoolAndOk(txHash):
newTxHashes.add txHash
# Remove from pending list regardless if tx is in result let peers = ctx.getPeers(peer)
for tx in txs.transactions: if peers.len == 0 or newTxHashes.len == 0:
let txHash = rlpHash(tx) return
ctx.pending.excl txHash
ctx.txPool.add(txs.transactions) await ctx.sendNewTxHashes(newTxHashes, peers)
except TransportError:
debug "Transport got closed during fetchTransactions"
return
except CatchableError as e:
debug "Exception in fetchTransactions", exc = e.name, err = e.msg
return
var newTxHashes = newSeqOfCap[Hash256](reqHashes.len)
for txHash in reqHashes:
if ctx.txPool.inPoolAndOk(txHash):
newTxHashes.add txHash
let peers = ctx.getPeers(peer)
if peers.len == 0 or newTxHashes.len == 0:
return
await ctx.sendNewTxHashes(newTxHashes, peers)
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions: peer observer # Private functions: peer observer
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc onPeerConnected(ctx: EthWireRef, peer: Peer) = proc onPeerConnected(ctx: EthWireRef, peer: Peer) =
if ctx.enableTxPool != Enabled: when txpool_enabled:
when trMissingOrDisabledGossipOk: var txHashes = newSeqOfCap[Hash256](ctx.txPool.numTxs)
notEnabled("onPeerConnected") for txHash, item in okPairs(ctx.txPool):
return txHashes.add txHash
var txHashes = newSeqOfCap[Hash256](ctx.txPool.numTxs) if txHashes.len == 0:
for txHash, item in okPairs(ctx.txPool): return
txHashes.add txHash
if txHashes.len == 0: debug "announce tx hashes to newly connected peer",
return number = txHashes.len
debug "announce tx hashes to newly connected peer", asyncSpawn ctx.sendNewTxHashes(txHashes, @[peer])
number = txHashes.len else:
discard
asyncSpawn ctx.sendNewTxHashes(txHashes, @[peer])
proc onPeerDisconnected(ctx: EthWireRef, peer: Peer) = proc onPeerDisconnected(ctx: EthWireRef, peer: Peer) =
debug "remove peer from knownByPeer", debug "remove peer from knownByPeer",
@ -304,43 +278,11 @@ proc new*(_: type EthWireRef,
chain: chain, chain: chain,
txPool: txPool, txPool: txPool,
peerPool: peerPool, peerPool: peerPool,
enableTxPool: Enabled,
lastCleanup: getTime()) lastCleanup: getTime())
if txPool.isNil:
ctx.enableTxPool = NotAvailable
when trMissingOrDisabledGossipOk:
trace "New eth handler, minimal/outbound support only"
ctx.setupPeerObserver() ctx.setupPeerObserver()
ctx ctx
# ------------------------------------------------------------------------------
# Public functions: callbacks setters
# ------------------------------------------------------------------------------
proc setNewBlockHandler*(ctx: EthWireRef, handler: NewBlockHandler, arg: pointer) =
ctx.newBlockHandler = NewBlockHandlerPair(
arg: arg,
handler: handler
)
proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler, arg: pointer) =
ctx.newBlockHashesHandler = NewBlockHashesHandlerPair(
arg: arg,
handler: handler
)
# ------------------------------------------------------------------------------
# Public getters/setters
# ------------------------------------------------------------------------------
proc `txPoolEnabled=`*(ctx: EthWireRef; ena: bool) {.gcsafe, raises: [].} =
if ctx.enableTxPool != NotAvailable:
ctx.enableTxPool = if ena: Enabled else: Suspended
proc txPoolEnabled*(ctx: EthWireRef): bool {.gcsafe, raises: [].} =
ctx.enableTxPool == Enabled
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions: eth wire protocol handlers # Public functions: eth wire protocol handlers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -391,15 +333,19 @@ method getPooledTxs*(ctx: EthWireRef,
hashes: openArray[Hash256]): hashes: openArray[Hash256]):
Result[seq[PooledTransaction], string] Result[seq[PooledTransaction], string]
{.gcsafe.} = {.gcsafe.} =
let txPool = ctx.txPool
var list: seq[PooledTransaction] when txpool_enabled:
for txHash in hashes: let txPool = ctx.txPool
let res = txPool.getItem(txHash) var list: seq[PooledTransaction]
if res.isOk: for txHash in hashes:
list.add res.value.pooledTx let res = txPool.getItem(txHash)
else: if res.isOk:
trace "handlers.getPooledTxs: tx not found", txHash list.add res.value.pooledTx
ok(list) else:
trace "handlers.getPooledTxs: tx not found", txHash
ok(list)
else:
err("txpool disabled")
method getBlockBodies*(ctx: EthWireRef, method getBlockBodies*(ctx: EthWireRef,
hashes: openArray[Hash256]): hashes: openArray[Hash256]):
@ -446,55 +392,53 @@ method handleAnnouncedTxs*(ctx: EthWireRef,
Result[void, string] Result[void, string]
{.gcsafe.} = {.gcsafe.} =
try: when txpool_enabled:
if ctx.enableTxPool != Enabled: try:
when trMissingOrDisabledGossipOk: if txs.len == 0:
notEnabled("handleAnnouncedTxs") return ok()
debug "received new transactions",
number = txs.len
if ctx.lastCleanup - getTime() > POOLED_STORAGE_TIME_LIMIT:
ctx.cleanupKnownByPeer()
var txHashes = newSeqOfCap[Hash256](txs.len)
for tx in txs:
txHashes.add rlpHash(tx)
ctx.addToKnownByPeer(txHashes, peer)
for tx in txs:
if tx.versionedHashes.len > 0:
# EIP-4844 blobs are not persisted and cannot be broadcasted
continue
ctx.txPool.add PooledTransaction(tx: tx)
var newTxHashes = newSeqOfCap[Hash256](txHashes.len)
var validTxs = newSeqOfCap[Transaction](txHashes.len)
for i, txHash in txHashes:
# Nodes must not automatically broadcast blob transactions to
# their peers. per EIP-4844 spec
if ctx.txPool.inPoolAndOk(txHash) and txs[i].txType != TxEip4844:
newTxHashes.add txHash
validTxs.add txs[i]
let
peers = ctx.getPeers(peer)
numPeers = peers.len
sendFull = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT)
if numPeers == 0 or validTxs.len == 0:
return ok()
asyncSpawn ctx.sendTransactions(txHashes, validTxs, peers[0..<sendFull])
asyncSpawn ctx.sendNewTxHashes(newTxHashes, peers[sendFull..^1])
return ok() return ok()
except CatchableError as exc:
if txs.len == 0: return err(exc.msg)
return ok() else:
err("txpool disabled")
debug "received new transactions",
number = txs.len
if ctx.lastCleanup - getTime() > POOLED_STORAGE_TIME_LIMIT:
ctx.cleanupKnownByPeer()
var txHashes = newSeqOfCap[Hash256](txs.len)
for tx in txs:
txHashes.add rlpHash(tx)
ctx.addToKnownByPeer(txHashes, peer)
for tx in txs:
if tx.versionedHashes.len > 0:
# EIP-4844 blobs are not persisted and cannot be broadcasted
continue
ctx.txPool.add PooledTransaction(tx: tx)
var newTxHashes = newSeqOfCap[Hash256](txHashes.len)
var validTxs = newSeqOfCap[Transaction](txHashes.len)
for i, txHash in txHashes:
# Nodes must not automatically broadcast blob transactions to
# their peers. per EIP-4844 spec
if ctx.txPool.inPoolAndOk(txHash) and txs[i].txType != TxEip4844:
newTxHashes.add txHash
validTxs.add txs[i]
let
peers = ctx.getPeers(peer)
numPeers = peers.len
sendFull = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT)
if numPeers == 0 or validTxs.len == 0:
return ok()
asyncSpawn ctx.sendTransactions(txHashes, validTxs, peers[0..<sendFull])
asyncSpawn ctx.sendNewTxHashes(newTxHashes, peers[sendFull..^1])
return ok()
except CatchableError as exc:
return err(exc.msg)
method handleAnnouncedTxsHashes*( method handleAnnouncedTxsHashes*(
ctx: EthWireRef; ctx: EthWireRef;
@ -504,11 +448,6 @@ method handleAnnouncedTxsHashes*(
txHashes: openArray[Hash256]; txHashes: openArray[Hash256];
): Result[void, string] = ): Result[void, string] =
## `Eth68` method ## `Eth68` method
if ctx.enableTxPool != Enabled:
when trMissingOrDisabledGossipOk:
notEnabled("handleAnnouncedTxsHashes")
return ok()
notImplemented "handleAnnouncedTxsHashes()/eth68" notImplemented "handleAnnouncedTxsHashes()/eth68"
method handleNewBlock*(ctx: EthWireRef, method handleNewBlock*(ctx: EthWireRef,

View File

@ -19,17 +19,6 @@ import
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions: convenience mappings for `eth` # Public functions: convenience mappings for `eth`
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc setEthHandlerNewBlocksAndHashes*(
node: EthereumNode;
blockHandler: NewBlockHandler;
hashesHandler: NewBlockHashesHandler;
arg: pointer;
) {.gcsafe, raises: [].} =
let w = EthWireRef(node.protocolState protocol.eth)
w.setNewBlockHandler(blockHandler, arg)
w.setNewBlockHashesHandler(hashesHandler, arg)
proc addEthHandlerCapability*( proc addEthHandlerCapability*(
node: EthereumNode; node: EthereumNode;
peerPool: PeerPool; peerPool: PeerPool;