mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-12 21:34:33 +00:00
45bc6422a0
The current getCanonicalHead of core db should not be confused with ForkedChain.latestHeader. Therefore we need to use getCanonicalHead to restricted case only, e.g. initializing ForkedChain.
472 lines
14 KiB
Nim
472 lines
14 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2018-2024 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[tables, times, hashes, sets],
|
|
chronicles, chronos,
|
|
stew/endians2,
|
|
eth/p2p,
|
|
eth/p2p/peer_pool,
|
|
../protocol,
|
|
../protocol/eth/eth_types,
|
|
../../core/[chain, tx_pool]
|
|
|
|
logScope:
|
|
topics = "eth-wire"
|
|
|
|
type
|
|
HashToTime = TableRef[Hash32, Time]
|
|
|
|
EthWireRef* = ref object of EthWireBase
|
|
db: CoreDbRef
|
|
chain: ForkedChainRef
|
|
txPool: TxPoolRef
|
|
peerPool: PeerPool
|
|
knownByPeer: Table[Peer, HashToTime]
|
|
pending: HashSet[Hash32]
|
|
lastCleanup: Time
|
|
|
|
const
|
|
extraTraceMessages = false
|
|
## Enabled additional logging noise
|
|
|
|
txpool_enabled = defined(enable_txpool_in_synchronizer)
|
|
|
|
when txpool_enabled:
|
|
const
|
|
NUM_PEERS_REBROADCAST_QUOTIENT = 4
|
|
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: helper functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc notEnabled(name: string) {.used.} =
|
|
debug "Wire handler method is disabled", meth = name
|
|
|
|
proc notImplemented(name: string) {.used.} =
|
|
debug "Wire handler method not implemented", meth = name
|
|
|
|
proc successorHeader(db: CoreDbRef,
|
|
h: Header,
|
|
skip = 0'u): Opt[Header] =
|
|
let offset = 1 + skip.BlockNumber
|
|
if h.number <= (not 0.BlockNumber) - offset:
|
|
let header = db.getBlockHeader(h.number + offset).valueOr:
|
|
return Opt.none(Header)
|
|
return Opt.some(header)
|
|
Opt.none(Header)
|
|
|
|
proc ancestorHeader(db: CoreDbRef,
|
|
h: Header,
|
|
skip = 0'u): Opt[Header] =
|
|
let offset = 1 + skip.BlockNumber
|
|
if h.number >= offset:
|
|
let header = db.getBlockHeader(h.number - offset).valueOr:
|
|
return Opt.none(Header)
|
|
return Opt.some(header)
|
|
Opt.none(Header)
|
|
|
|
proc blockHeader(db: CoreDbRef,
|
|
b: BlockHashOrNumber): Opt[Header] =
|
|
let header = if b.isHash:
|
|
db.getBlockHeader(b.hash).valueOr:
|
|
return Opt.none(Header)
|
|
else:
|
|
db.getBlockHeader(b.number).valueOr:
|
|
return Opt.none(Header)
|
|
Opt.some(header)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: peers related functions
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc hash(peer: Peer): hashes.Hash {.used.} =
|
|
hash(peer.remote)
|
|
|
|
when txpool_enabled:
|
|
proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] =
|
|
# do not send back tx or txhash to thisPeer
|
|
for peer in peers(ctx.peerPool):
|
|
if peer != thisPeer:
|
|
result.add peer
|
|
|
|
proc cleanupKnownByPeer(ctx: EthWireRef) =
|
|
let now = getTime()
|
|
var tmp = HashSet[Hash32]()
|
|
for _, map in ctx.knownByPeer:
|
|
for hash, time in map:
|
|
if time - now >= POOLED_STORAGE_TIME_LIMIT:
|
|
tmp.incl hash
|
|
for hash in tmp:
|
|
map.del(hash)
|
|
tmp.clear()
|
|
|
|
var tmpPeer = HashSet[Peer]()
|
|
for peer, map in ctx.knownByPeer:
|
|
if map.len == 0:
|
|
tmpPeer.incl peer
|
|
|
|
for peer in tmpPeer:
|
|
ctx.knownByPeer.del peer
|
|
|
|
ctx.lastCleanup = now
|
|
|
|
proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash32], peer: Peer) =
|
|
var map: HashToTime
|
|
ctx.knownByPeer.withValue(peer, val) do:
|
|
map = val[]
|
|
do:
|
|
map = newTable[Hash32, Time]()
|
|
ctx.knownByPeer[peer] = map
|
|
|
|
for txHash in txHashes:
|
|
if txHash notin map:
|
|
map[txHash] = getTime()
|
|
|
|
proc addToKnownByPeer(ctx: EthWireRef,
|
|
txHashes: openArray[Hash32],
|
|
peer: Peer,
|
|
newHashes: var seq[Hash32]) =
|
|
var map: HashToTime
|
|
ctx.knownByPeer.withValue(peer, val) do:
|
|
map = val[]
|
|
do:
|
|
map = newTable[Hash32, Time]()
|
|
ctx.knownByPeer[peer] = map
|
|
|
|
newHashes = newSeqOfCap[Hash32](txHashes.len)
|
|
for txHash in txHashes:
|
|
if txHash notin map:
|
|
map[txHash] = getTime()
|
|
newHashes.add txHash
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: async workers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
when txpool_enabled:
|
|
proc sendNewTxHashes(ctx: EthWireRef,
|
|
txHashes: seq[Hash32],
|
|
peers: seq[Peer]): Future[void] {.async.} =
|
|
try:
|
|
for peer in peers:
|
|
# Add to known tx hashes and get hashes still to send to peer
|
|
var hashesToSend: seq[Hash32]
|
|
ctx.addToKnownByPeer(txHashes, peer, hashesToSend)
|
|
|
|
# Broadcast to peer if at least 1 new tx hash to announce
|
|
if hashesToSend.len > 0:
|
|
# Currently only one protocol version is available as compiled
|
|
when ethVersion == 68:
|
|
await newPooledTransactionHashes(
|
|
peer,
|
|
1u8.repeat hashesToSend.len, # type
|
|
0.repeat hashesToSend.len, # sizes
|
|
hashesToSend)
|
|
else:
|
|
await newPooledTransactionHashes(peer, hashesToSend)
|
|
|
|
except TransportError:
|
|
debug "Transport got closed during sendNewTxHashes"
|
|
except CatchableError as e:
|
|
debug "Exception in sendNewTxHashes", exc = e.name, err = e.msg
|
|
|
|
proc sendTransactions(ctx: EthWireRef,
|
|
txHashes: seq[Hash32],
|
|
txs: seq[Transaction],
|
|
peers: seq[Peer]): Future[void] {.async.} =
|
|
try:
|
|
for peer in peers:
|
|
# This is used to avoid re-sending along pooledTxHashes
|
|
# announcements/re-broadcasts
|
|
ctx.addToKnownByPeer(txHashes, peer)
|
|
await peer.transactions(txs)
|
|
|
|
except TransportError:
|
|
debug "Transport got closed during sendTransactions"
|
|
except CatchableError as e:
|
|
debug "Exception in sendTransactions", exc = e.name, err = e.msg
|
|
|
|
proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash32], peer: Peer): Future[void] {.async.} =
|
|
debug "fetchTx: requesting txs",
|
|
number = reqHashes.len
|
|
|
|
try:
|
|
|
|
let res = await peer.getPooledTransactions(reqHashes)
|
|
if res.isNone:
|
|
error "not able to get pooled transactions"
|
|
return
|
|
|
|
let txs = res.get()
|
|
debug "fetchTx: received requested txs",
|
|
number = txs.transactions.len
|
|
|
|
# Remove from pending list regardless if tx is in result
|
|
for tx in txs.transactions:
|
|
let txHash = rlpHash(tx)
|
|
ctx.pending.excl txHash
|
|
|
|
ctx.txPool.add(txs.transactions)
|
|
|
|
except TransportError:
|
|
debug "Transport got closed during fetchTransactions"
|
|
return
|
|
except CatchableError as e:
|
|
debug "Exception in fetchTransactions", exc = e.name, err = e.msg
|
|
return
|
|
|
|
var newTxHashes = newSeqOfCap[Hash32](reqHashes.len)
|
|
for txHash in reqHashes:
|
|
if ctx.txPool.inPoolAndOk(txHash):
|
|
newTxHashes.add txHash
|
|
|
|
let peers = ctx.getPeers(peer)
|
|
if peers.len == 0 or newTxHashes.len == 0:
|
|
return
|
|
|
|
await ctx.sendNewTxHashes(newTxHashes, peers)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: peer observer
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc onPeerConnected(ctx: EthWireRef, peer: Peer) =
|
|
when txpool_enabled:
|
|
var txHashes = newSeqOfCap[Hash32](ctx.txPool.numTxs)
|
|
for txHash, item in okPairs(ctx.txPool):
|
|
txHashes.add txHash
|
|
|
|
if txHashes.len == 0:
|
|
return
|
|
|
|
debug "announce tx hashes to newly connected peer",
|
|
number = txHashes.len
|
|
|
|
asyncSpawn ctx.sendNewTxHashes(txHashes, @[peer])
|
|
else:
|
|
discard
|
|
|
|
proc onPeerDisconnected(ctx: EthWireRef, peer: Peer) =
|
|
debug "remove peer from knownByPeer",
|
|
peer
|
|
|
|
ctx.knownByPeer.del(peer)
|
|
|
|
proc setupPeerObserver(ctx: EthWireRef) =
|
|
var po = PeerObserver(
|
|
onPeerConnected:
|
|
proc(p: Peer) {.gcsafe.} =
|
|
ctx.onPeerConnected(p),
|
|
onPeerDisconnected:
|
|
proc(p: Peer) {.gcsafe.} =
|
|
ctx.onPeerDisconnected(p))
|
|
po.setProtocol protocol.eth
|
|
ctx.peerPool.addObserver(ctx, po)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public constructor/destructor
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc new*(_: type EthWireRef,
|
|
chain: ForkedChainRef,
|
|
txPool: TxPoolRef,
|
|
peerPool: PeerPool): EthWireRef =
|
|
let ctx = EthWireRef(
|
|
db: chain.db,
|
|
chain: chain,
|
|
txPool: txPool,
|
|
peerPool: peerPool,
|
|
lastCleanup: getTime())
|
|
|
|
ctx.setupPeerObserver()
|
|
ctx
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions: eth wire protocol handlers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
method getStatus*(ctx: EthWireRef): Result[EthState, string]
|
|
{.gcsafe.} =
|
|
let
|
|
db = ctx.db
|
|
com = ctx.chain.com
|
|
bestBlock = ctx.chain.latestHeader
|
|
forkId = com.forkId(bestBlock.number, bestBlock.timestamp)
|
|
|
|
return ok(EthState(
|
|
totalDifficulty: db.headTotalDifficulty,
|
|
genesisHash: com.genesisHash,
|
|
bestBlockHash: bestBlock.blockHash,
|
|
forkId: ChainForkId(
|
|
forkHash: forkId.crc.toBytesBE,
|
|
forkNext: forkId.nextFork
|
|
)
|
|
))
|
|
|
|
method getReceipts*(ctx: EthWireRef,
|
|
hashes: openArray[Hash32]):
|
|
Result[seq[seq[Receipt]], string]
|
|
{.gcsafe.} =
|
|
let db = ctx.db
|
|
var list: seq[seq[Receipt]]
|
|
for blockHash in hashes:
|
|
let header = db.getBlockHeader(blockHash).valueOr:
|
|
list.add @[]
|
|
trace "handlers.getReceipts: blockHeader not found", blockHash
|
|
continue
|
|
let receiptList = ?db.getReceipts(header.receiptsRoot)
|
|
list.add receiptList
|
|
|
|
return ok(list)
|
|
|
|
method getPooledTxs*(ctx: EthWireRef,
|
|
hashes: openArray[Hash32]):
|
|
Result[seq[PooledTransaction], string]
|
|
{.gcsafe.} =
|
|
|
|
when txpool_enabled:
|
|
let txPool = ctx.txPool
|
|
var list: seq[PooledTransaction]
|
|
for txHash in hashes:
|
|
let res = txPool.getItem(txHash)
|
|
if res.isOk:
|
|
list.add res.value.pooledTx
|
|
else:
|
|
trace "handlers.getPooledTxs: tx not found", txHash
|
|
ok(list)
|
|
else:
|
|
var list: seq[PooledTransaction]
|
|
ok(list)
|
|
|
|
method getBlockBodies*(ctx: EthWireRef,
|
|
hashes: openArray[Hash32]):
|
|
Result[seq[BlockBody], string]
|
|
{.gcsafe.} =
|
|
let db = ctx.db
|
|
var list: seq[BlockBody]
|
|
for blockHash in hashes:
|
|
let body = db.getBlockBody(blockHash).valueOr:
|
|
list.add BlockBody()
|
|
trace "handlers.getBlockBodies: blockBody not found", blockHash
|
|
continue
|
|
list.add body
|
|
|
|
return ok(list)
|
|
|
|
method getBlockHeaders*(ctx: EthWireRef,
|
|
req: EthBlocksRequest):
|
|
Result[seq[Header], string]
|
|
{.gcsafe.} =
|
|
let db = ctx.db
|
|
var list = newSeqOfCap[Header](req.maxResults)
|
|
var foundBlock = db.blockHeader(req.startBlock).valueOr:
|
|
return ok(list)
|
|
list.add foundBlock
|
|
|
|
while uint64(list.len) < req.maxResults:
|
|
if not req.reverse:
|
|
foundBlock = db.successorHeader(foundBlock, req.skip).valueOr:
|
|
break
|
|
else:
|
|
foundBlock = db.ancestorHeader(foundBlock, req.skip).valueOr:
|
|
break
|
|
list.add foundBlock
|
|
return ok(list)
|
|
|
|
method handleAnnouncedTxs*(ctx: EthWireRef,
|
|
peer: Peer,
|
|
txs: openArray[Transaction]):
|
|
Result[void, string]
|
|
{.gcsafe.} =
|
|
|
|
when txpool_enabled:
|
|
try:
|
|
if txs.len == 0:
|
|
return ok()
|
|
|
|
debug "received new transactions",
|
|
number = txs.len
|
|
|
|
if ctx.lastCleanup - getTime() > POOLED_STORAGE_TIME_LIMIT:
|
|
ctx.cleanupKnownByPeer()
|
|
|
|
var txHashes = newSeqOfCap[Hash32](txs.len)
|
|
for tx in txs:
|
|
txHashes.add rlpHash(tx)
|
|
|
|
ctx.addToKnownByPeer(txHashes, peer)
|
|
for tx in txs:
|
|
if tx.versionedHashes.len > 0:
|
|
# EIP-4844 blobs are not persisted and cannot be broadcasted
|
|
continue
|
|
ctx.txPool.add PooledTransaction(tx: tx)
|
|
|
|
var newTxHashes = newSeqOfCap[Hash32](txHashes.len)
|
|
var validTxs = newSeqOfCap[Transaction](txHashes.len)
|
|
for i, txHash in txHashes:
|
|
# Nodes must not automatically broadcast blob transactions to
|
|
# their peers. per EIP-4844 spec
|
|
if ctx.txPool.inPoolAndOk(txHash) and txs[i].txType != TxEip4844:
|
|
newTxHashes.add txHash
|
|
validTxs.add txs[i]
|
|
|
|
let
|
|
peers = ctx.getPeers(peer)
|
|
numPeers = peers.len
|
|
sendFull = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT)
|
|
|
|
if numPeers == 0 or validTxs.len == 0:
|
|
return ok()
|
|
|
|
asyncSpawn ctx.sendTransactions(txHashes, validTxs, peers[0..<sendFull])
|
|
|
|
asyncSpawn ctx.sendNewTxHashes(newTxHashes, peers[sendFull..^1])
|
|
return ok()
|
|
except CatchableError as exc:
|
|
return err(exc.msg)
|
|
else:
|
|
ok()
|
|
|
|
method handleAnnouncedTxsHashes*(
|
|
ctx: EthWireRef;
|
|
peer: Peer;
|
|
txTypes: seq[byte];
|
|
txSizes: openArray[uint64];
|
|
txHashes: openArray[Hash32];
|
|
): Result[void, string] =
|
|
when extraTraceMessages:
|
|
trace "Wire handler ignoring txs hashes", nHashes=txHashes.len
|
|
ok()
|
|
|
|
method handleNewBlock*(ctx: EthWireRef,
|
|
peer: Peer,
|
|
blk: EthBlock,
|
|
totalDifficulty: DifficultyInt):
|
|
Result[void, string]
|
|
{.gcsafe.} =
|
|
# We dropped support for non-merge networks
|
|
err("block broadcasts disallowed")
|
|
|
|
method handleNewBlockHashes*(ctx: EthWireRef,
|
|
peer: Peer,
|
|
hashes: openArray[NewBlockHashesAnnounce]):
|
|
Result[void, string]
|
|
{.gcsafe.} =
|
|
# We dropped support for non-merge networks
|
|
err("block announcements disallowed")
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|