Fetch data in batches.

This commit is contained in:
web3-developer 2024-07-23 14:24:57 +08:00
parent 0af15e8b0c
commit f0eefcf13b
No known key found for this signature in database
GPG Key ID: 496A1BB40CE64F40
3 changed files with 140 additions and 44 deletions

View File

@ -9,6 +9,7 @@
import import
chronicles, chronicles,
json_serialization,
json_rpc/rpcclient, json_rpc/rpcclient,
web3/[eth_api, eth_api_types], web3/[eth_api, eth_api_types],
../../rpc/rpc_calls/rpc_trace_calls, ../../rpc/rpc_calls/rpc_trace_calls,
@ -51,6 +52,63 @@ proc getBlockByNumber*(
return ok(blck) return ok(blck)
# type BlockObjectLite* = ref object
# number*: BlockNumber # the block number. null when its pending block.
# hash*: Hash256 # hash of the block. null when its pending block.
# #parentHash*: Hash256 # hash of the parent block.
# #sha3Uncles*: Hash256 # SHA3 of the uncles data in the block.
# #logsBloom*: FixedBytes[256] # the bloom filter for the logs of the block. null when its pending block.
# #transactionsRoot*: Hash256 # the root of the transaction trie of the block.
# stateRoot*: Hash256 # the root of the final state trie of the block.
# #receiptsRoot*: Hash256 # the root of the receipts trie of the block.
# miner*: Address # the address of the beneficiary to whom the mining rewards were given.
# #difficulty*: UInt256 # integer of the difficulty for this block.
# #extraData*: HistoricExtraData # the "extra data" field of this block.
# #gasLimit*: Quantity # the maximum gas allowed in this block.
# #gasUsed*: Quantity # the total used gas by all transactions in this block.
# #timestamp*: Quantity # the unix timestamp for when the block was collated.
# #nonce*: Opt[FixedBytes[8]] # hash of the generated proof-of-work. null when its pending block.
# #mixHash*: Hash256
# #size*: Quantity # integer the size of this block in bytes.
# #totalDifficulty*: UInt256 # integer of the total difficulty of the chain until this block.
# #transactions*: seq[TxOrHash] # list of transaction objects, or 32 Bytes transaction hashes depending on the last given parameter.
# uncles*: seq[Hash256] # list of uncle hashes.
# #baseFeePerGas*: Opt[UInt256] # EIP-1559
# #withdrawals*: Opt[seq[WithdrawalObject]] # EIP-4895
# #withdrawalsRoot*: Opt[Hash256] # EIP-4895
# #blobGasUsed*: Opt[Quantity] # EIP-4844
# #excessBlobGas*: Opt[Quantity] # EIP-4844
# #parentBeaconBlockRoot*: Opt[Hash256] # EIP-4788
# createJsonFlavor JsonC,
# # automaticObjectSerialization = false,
# # requireAllFields = false,
# allowUnknownFields = true
# BlockObjectLite.useDefaultSerializationIn JsonC
proc getBlocksByNumber*(
client: RpcClient, startBlock: uint64, batchSize: int
): Future[Result[seq[BlockObject], string]] {.async: (raises: []).} =
let blck =
try:
let batch = client.prepareBatch()
for i in 0 ..< batchSize:
batch.eth_getBlockByNumber(blockId(startBlock + uint64(i)), false)
let res = (await batch.send()).get()
var blockObjs = newSeqOfCap[BlockObject](batchSize)
for i in 0 ..< batchSize:
blockObjs.add(Json.decode(res[i].result.string, BlockObject))
blockObjs
except CatchableError as e:
return err("EL JSON-RPC eth_getBlockByNumber failed: " & e.msg)
return ok(blck)
proc getUncleByBlockNumberAndIndex*( proc getUncleByBlockNumberAndIndex*(
client: RpcClient, blockId: BlockIdentifier, index: Quantity client: RpcClient, blockId: BlockIdentifier, index: Quantity
): Future[Result[BlockObject, string]] {.async: (raises: []).} = ): Future[Result[BlockObject, string]] {.async: (raises: []).} =

View File

@ -46,31 +46,35 @@ proc runBackfillCollectBlockDataLoop(
warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance" warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance"
var currentBlockNumber = startBlockNumber var currentBlockNumber = startBlockNumber
let batchSize = 1000
while true: while true:
if currentBlockNumber mod 100000 == 0: if currentBlockNumber mod 10000 == 0:
info "Collecting block data for block number: ", blockNumber = currentBlockNumber info "Collecting block data for block number: ", blockNumber = currentBlockNumber
let let
blockId = blockId(currentBlockNumber) blockId = blockId(currentBlockNumber)
blockRequest = web3Client.getBlockByNumber(blockId, false) blockObjects = (await web3Client.getBlocksByNumber(currentBlockNumber, batchSize)).valueOr:
stateDiffsRequest = web3Client.getStateDiffsByBlockNumber(blockId) error "Failed to get blocks", error
blockObject = (await blockRequest).valueOr:
error "Failed to get block", error
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
continue continue
stateDiffs = (
await web3Client.getStateDiffsByBlockNumber(currentBlockNumber, batchSize)
).valueOr:
error "Failed to get state diffs", error
await sleepAsync(1.seconds)
continue
for j in 0 ..< batchSize:
let blockObject = blockObjects[j]
var uncleBlockRequests: seq[Future[Result[BlockObject, string]]] var uncleBlockRequests: seq[Future[Result[BlockObject, string]]]
for i in 0 .. blockObject.uncles.high: for i in 0 .. blockObject.uncles.high:
uncleBlockRequests.add( uncleBlockRequests.add(
web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity) web3Client.getUncleByBlockNumberAndIndex(
blockId(currentBlockNumber + j.uint64), i.Quantity
)
) )
let stateDiffs = (await stateDiffsRequest).valueOr:
error "Failed to get state diffs", error
await sleepAsync(1.seconds)
continue
var uncleBlocks: seq[BlockObject] var uncleBlocks: seq[BlockObject]
for uncleBlockRequest in uncleBlockRequests: for uncleBlockRequest in uncleBlockRequests:
@ -89,14 +93,14 @@ proc runBackfillCollectBlockDataLoop(
await blockDataQueue.addLast( await blockDataQueue.addLast(
BlockDataRef( BlockDataRef(
blockNumber: currentBlockNumber, blockNumber: currentBlockNumber + j.uint64,
blockObject: blockObject, blockObject: blockObject,
stateDiffs: stateDiffs, stateDiffs: stateDiffs[j],
uncleBlocks: uncleBlocks, uncleBlocks: uncleBlocks,
) )
) )
inc currentBlockNumber currentBlockNumber += batchSize.uint64
proc runBackfillBuildBlockOffersLoop( proc runBackfillBuildBlockOffersLoop(
blockDataQueue: AsyncQueue[BlockDataRef], blockDataQueue: AsyncQueue[BlockDataRef],
@ -142,15 +146,15 @@ proc runBackfillBuildBlockOffersLoop(
while true: while true:
let blockData = await blockDataQueue.popFirst() let blockData = await blockDataQueue.popFirst()
if blockData.blockNumber mod 100000 == 0: if blockData.blockNumber mod 10000 == 0:
info "Building state for block number: ", blockNumber = blockData.blockNumber info "Building state for block number: ", blockNumber = blockData.blockNumber
# For now all WorldStateRef functions need to be inside a transaction # For now all WorldStateRef functions need to be inside a transaction
# because the DatabaseRef currently only supports reading and writing to/from # because the DatabaseRef currently only supports reading and writing to/from
# a single active transaction. # a single active transaction.
db.withTransaction: db.withTransaction:
defer: # defer:
worldState.clearPreimages() # worldState.clearPreimages()
for stateDiff in blockData.stateDiffs: for stateDiff in blockData.stateDiffs:
worldState.applyStateDiff(stateDiff) worldState.applyStateDiff(stateDiff)

View File

@ -11,6 +11,9 @@ import
chronicles, chronicles,
stew/byteutils, stew/byteutils,
stint, stint,
json_serialization,
json_rpc/rpcclient,
web3/[eth_api, eth_api_types],
eth/common/[eth_types, eth_types_rlp], eth/common/[eth_types, eth_types_rlp],
../../../rpc/rpc_calls/rpc_trace_calls, ../../../rpc/rpc_calls/rpc_trace_calls,
../portal_bridge_common ../portal_bridge_common
@ -98,15 +101,46 @@ proc toStateDiffs(
stateDiffs stateDiffs
# proc getBlocksByNumber*(
# client: RpcClient, startBlock: uint64, batchSize: int
# ): Future[Result[seq[BlockObject], string]] {.async: (raises: []).} =
# let blck =
# try:
# let batch = client.prepareBatch()
# for i in 0 ..< batchSize:
# batch.eth_getBlockByNumber(blockId(startBlock + uint64(i)), false)
# let res = (await batch.send()).get()
# var blockObjs = newSeqOfCap[BlockObject](batchSize)
# for i in 0 ..< batchSize:
# blockObjs.add(Json.decode(res[i].result.string, BlockObject))
# blockObjs
# except CatchableError as e:
# return err("EL JSON-RPC eth_getBlockByNumber failed: " & e.msg)
# return ok(blck)
proc getStateDiffsByBlockNumber*( proc getStateDiffsByBlockNumber*(
client: RpcClient, blockId: BlockIdentifier client: RpcClient, startBlock: uint64, batchSize: int
): Future[Result[seq[StateDiffRef], string]] {.async: (raises: []).} = ): Future[Result[seq[seq[StateDiffRef]], string]] {.async: (raises: []).} =
const traceOpts = @["stateDiff"] const traceOpts = @["stateDiff"]
try: try:
let blockTraceJson = await client.trace_replayBlockTransactions(blockId, traceOpts) let batch = client.prepareBatch()
if blockTraceJson.isNil:
return err("EL failed to provide requested state diff") for i in 0 ..< batchSize:
ok(blockTraceJson.toStateDiffs()) batch.trace_replayBlockTransactions(blockId(startBlock + uint64(i)), traceOpts)
let res = (await batch.send()).get()
var stateDiffsObjs = newSeqOfCap[seq[StateDiffRef]](batchSize)
for i in 0 ..< batchSize:
stateDiffsObjs.add(Json.decode(res[i].result.string, JsonNode).toStateDiffs())
ok(stateDiffsObjs)
except CatchableError as e: except CatchableError as e:
return err("EL JSON-RPC trace_replayBlockTransactions failed: " & e.msg) return err("EL JSON-RPC trace_replayBlockTransactions failed: " & e.msg)