add exception handlers to transaction exchange code

This commit is contained in:
jangko 2022-11-14 12:43:05 +07:00
parent 342dbdea42
commit c7b3c374f0
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
1 changed files with 50 additions and 27 deletions

View File

@ -67,24 +67,38 @@ proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer,
proc sendNewTxHashes(ctx: EthWireRef,
txHashes: seq[Hash256],
peers: seq[Peer]): Future[void] {.async.} =
for peer in peers:
# Add to known tx hashes and get hashes still to send to peer
var hashesToSend: seq[Hash256]
ctx.addToKnownByPeer(txHashes, peer, hashesToSend)
try:
# Broadcast to peer if at least 1 new tx hash to announce
if hashesToSend.len > 0:
await peer.newPooledTransactionHashes(hashesToSend)
for peer in peers:
# Add to known tx hashes and get hashes still to send to peer
var hashesToSend: seq[Hash256]
ctx.addToKnownByPeer(txHashes, peer, hashesToSend)
# Broadcast to peer if at least 1 new tx hash to announce
if hashesToSend.len > 0:
await peer.newPooledTransactionHashes(hashesToSend)
except TransportError:
debug "Transport got closed during sendNewTxHashes"
except CatchableError as e:
debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg
proc sendTransactions(ctx: EthWireRef,
txHashes: seq[Hash256],
txs: seq[Transaction],
peers: seq[Peer]): Future[void] {.async.} =
for peer in peers:
# This is used to avoid re-sending along pooledTxHashes
# announcements/re-broadcasts
ctx.addToKnownByPeer(txHashes, peer)
await peer.transactions(txs)
try:
for peer in peers:
# This is used to avoid re-sending along pooledTxHashes
# announcements/re-broadcasts
ctx.addToKnownByPeer(txHashes, peer)
await peer.transactions(txs)
except TransportError:
debug "Transport got closed during sendTransactions"
except CatchableError as e:
debug "Exception in sendTransactions", exc = e.name, err = e.msg
proc inPool(ctx: EthWireRef, txHash: Hash256): bool =
let res = ctx.txPool.getItem(txHash)
@ -292,21 +306,30 @@ proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Fu
debug "fetchTx: requesting txs",
number = reqHashes.len
let res = await peer.getPooledTransactions(reqHashes)
if res.isNone:
error "not able to get pooled transactions"
try:
let res = await peer.getPooledTransactions(reqHashes)
if res.isNone:
error "not able to get pooled transactions"
return
let txs = res.get()
debug "fetchTx: received requested txs",
number = txs.transactions.len
# Remove from pending list regardless if tx is in result
for tx in txs.transactions:
let txHash = rlpHash(tx)
ctx.pending.excl txHash
ctx.txPool.jobAddTxs(txs.transactions)
except TransportError:
debug "Transport got closed during fetchTransactions"
return
except CatchableError as e:
debug "Exception in fetchTransactions", exc = e.name, err = e.msg
return
let txs = res.get()
debug "fetchTx: received requested txs",
number = txs.transactions.len
# Remove from pending list regardless if tx is in result
for tx in txs.transactions:
let txHash = rlpHash(tx)
ctx.pending.excl txHash
ctx.txPool.jobAddTxs(txs.transactions)
var newTxHashes = newSeqOfCap[Hash256](reqHashes.len)
for txHash in reqHashes:
@ -314,7 +337,7 @@ proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Fu
newTxHashes.add txHash
let peers = ctx.getPeers(peer)
if peers.len == 0:
if peers.len == 0 or newTxHashes.len == 0:
return
await ctx.sendNewTxHashes(newTxHashes, peers)