From f0eefcf13bae21d285942919431ad246cee89c7b Mon Sep 17 00:00:00 2001 From: web3-developer <51288821+web3-developer@users.noreply.github.com> Date: Tue, 23 Jul 2024 14:24:57 +0800 Subject: [PATCH] Fetch data in batches. --- .../portal_bridge/portal_bridge_common.nim | 58 ++++++++++++++ .../portal_bridge/portal_bridge_state.nim | 80 ++++++++++--------- .../portal_bridge/state_bridge/state_diff.nim | 46 +++++++++-- 3 files changed, 140 insertions(+), 44 deletions(-) diff --git a/fluffy/tools/portal_bridge/portal_bridge_common.nim b/fluffy/tools/portal_bridge/portal_bridge_common.nim index 1e7b852b1..3dbe08dde 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_common.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_common.nim @@ -9,6 +9,7 @@ import chronicles, + json_serialization, json_rpc/rpcclient, web3/[eth_api, eth_api_types], ../../rpc/rpc_calls/rpc_trace_calls, @@ -51,6 +52,63 @@ proc getBlockByNumber*( return ok(blck) +# type BlockObjectLite* = ref object +# number*: BlockNumber # the block number. null when its pending block. +# hash*: Hash256 # hash of the block. null when its pending block. +# #parentHash*: Hash256 # hash of the parent block. +# #sha3Uncles*: Hash256 # SHA3 of the uncles data in the block. +# #logsBloom*: FixedBytes[256] # the bloom filter for the logs of the block. null when its pending block. +# #transactionsRoot*: Hash256 # the root of the transaction trie of the block. +# stateRoot*: Hash256 # the root of the final state trie of the block. +# #receiptsRoot*: Hash256 # the root of the receipts trie of the block. +# miner*: Address # the address of the beneficiary to whom the mining rewards were given. +# #difficulty*: UInt256 # integer of the difficulty for this block. +# #extraData*: HistoricExtraData # the "extra data" field of this block. +# #gasLimit*: Quantity # the maximum gas allowed in this block. +# #gasUsed*: Quantity # the total used gas by all transactions in this block. +# #timestamp*: Quantity # the unix timestamp for when the block was collated. +# #nonce*: Opt[FixedBytes[8]] # hash of the generated proof-of-work. null when its pending block. +# #mixHash*: Hash256 +# #size*: Quantity # integer the size of this block in bytes. +# #totalDifficulty*: UInt256 # integer of the total difficulty of the chain until this block. +# #transactions*: seq[TxOrHash] # list of transaction objects, or 32 Bytes transaction hashes depending on the last given parameter. +# uncles*: seq[Hash256] # list of uncle hashes. +# #baseFeePerGas*: Opt[UInt256] # EIP-1559 +# #withdrawals*: Opt[seq[WithdrawalObject]] # EIP-4895 +# #withdrawalsRoot*: Opt[Hash256] # EIP-4895 +# #blobGasUsed*: Opt[Quantity] # EIP-4844 +# #excessBlobGas*: Opt[Quantity] # EIP-4844 +# #parentBeaconBlockRoot*: Opt[Hash256] # EIP-4788 + +# createJsonFlavor JsonC, +# # automaticObjectSerialization = false, +# # requireAllFields = false, +# allowUnknownFields = true + +# BlockObjectLite.useDefaultSerializationIn JsonC + +proc getBlocksByNumber*( + client: RpcClient, startBlock: uint64, batchSize: int +): Future[Result[seq[BlockObject], string]] {.async: (raises: []).} = + let blck = + try: + let batch = client.prepareBatch() + + for i in 0 ..< batchSize: + batch.eth_getBlockByNumber(blockId(startBlock + uint64(i)), false) + + let res = (await batch.send()).get() + + var blockObjs = newSeqOfCap[BlockObject](batchSize) + for i in 0 ..< batchSize: + blockObjs.add(Json.decode(res[i].result.string, BlockObject)) + + blockObjs + except CatchableError as e: + return err("EL JSON-RPC eth_getBlockByNumber failed: " & e.msg) + + return ok(blck) + proc getUncleByBlockNumberAndIndex*( client: RpcClient, blockId: BlockIdentifier, index: Quantity ): Future[Result[BlockObject, string]] {.async: (raises: []).} = diff --git a/fluffy/tools/portal_bridge/portal_bridge_state.nim b/fluffy/tools/portal_bridge/portal_bridge_state.nim index cb0314f06..e9a759302 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_state.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_state.nim @@ -46,57 +46,61 @@ proc runBackfillCollectBlockDataLoop( warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance" var currentBlockNumber = startBlockNumber + let batchSize = 1000 while true: - if currentBlockNumber mod 100000 == 0: + if currentBlockNumber mod 10000 == 0: info "Collecting block data for block number: ", blockNumber = currentBlockNumber let blockId = blockId(currentBlockNumber) - blockRequest = web3Client.getBlockByNumber(blockId, false) - stateDiffsRequest = web3Client.getStateDiffsByBlockNumber(blockId) - - blockObject = (await blockRequest).valueOr: - error "Failed to get block", error + blockObjects = (await web3Client.getBlocksByNumber(currentBlockNumber, batchSize)).valueOr: + error "Failed to get blocks", error + await sleepAsync(1.seconds) + continue + stateDiffs = ( + await web3Client.getStateDiffsByBlockNumber(currentBlockNumber, batchSize) + ).valueOr: + error "Failed to get state diffs", error await sleepAsync(1.seconds) continue - var uncleBlockRequests: seq[Future[Result[BlockObject, string]]] - for i in 0 .. blockObject.uncles.high: - uncleBlockRequests.add( - web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity) - ) + for j in 0 ..< batchSize: + let blockObject = blockObjects[j] - let stateDiffs = (await stateDiffsRequest).valueOr: - error "Failed to get state diffs", error - await sleepAsync(1.seconds) - continue + var uncleBlockRequests: seq[Future[Result[BlockObject, string]]] + for i in 0 .. blockObject.uncles.high: + uncleBlockRequests.add( + web3Client.getUncleByBlockNumberAndIndex( + blockId(currentBlockNumber + j.uint64), i.Quantity + ) + ) - var uncleBlocks: seq[BlockObject] - for uncleBlockRequest in uncleBlockRequests: - try: - let uncleBlock = (await uncleBlockRequest).valueOr: - error "Failed to get uncle blocks", error - await sleepAsync(1.seconds) + var uncleBlocks: seq[BlockObject] + for uncleBlockRequest in uncleBlockRequests: + try: + let uncleBlock = (await uncleBlockRequest).valueOr: + error "Failed to get uncle blocks", error + await sleepAsync(1.seconds) + break + uncleBlocks.add(uncleBlock) + except CatchableError as e: + error "Failed to get uncleBlockRequest", error = e.msg break - uncleBlocks.add(uncleBlock) - except CatchableError as e: - error "Failed to get uncleBlockRequest", error = e.msg - break - if uncleBlocks.len() < uncleBlockRequests.len(): - continue + if uncleBlocks.len() < uncleBlockRequests.len(): + continue - await blockDataQueue.addLast( - BlockDataRef( - blockNumber: currentBlockNumber, - blockObject: blockObject, - stateDiffs: stateDiffs, - uncleBlocks: uncleBlocks, + await blockDataQueue.addLast( + BlockDataRef( + blockNumber: currentBlockNumber + j.uint64, + blockObject: blockObject, + stateDiffs: stateDiffs[j], + uncleBlocks: uncleBlocks, + ) ) - ) - inc currentBlockNumber + currentBlockNumber += batchSize.uint64 proc runBackfillBuildBlockOffersLoop( blockDataQueue: AsyncQueue[BlockDataRef], @@ -142,15 +146,15 @@ proc runBackfillBuildBlockOffersLoop( while true: let blockData = await blockDataQueue.popFirst() - if blockData.blockNumber mod 100000 == 0: + if blockData.blockNumber mod 10000 == 0: info "Building state for block number: ", blockNumber = blockData.blockNumber # For now all WorldStateRef functions need to be inside a transaction # because the DatabaseRef currently only supports reading and writing to/from # a single active transaction. db.withTransaction: - defer: - worldState.clearPreimages() + # defer: + # worldState.clearPreimages() for stateDiff in blockData.stateDiffs: worldState.applyStateDiff(stateDiff) diff --git a/fluffy/tools/portal_bridge/state_bridge/state_diff.nim b/fluffy/tools/portal_bridge/state_bridge/state_diff.nim index 46562186e..7cce4db98 100644 --- a/fluffy/tools/portal_bridge/state_bridge/state_diff.nim +++ b/fluffy/tools/portal_bridge/state_bridge/state_diff.nim @@ -11,6 +11,9 @@ import chronicles, stew/byteutils, stint, + json_serialization, + json_rpc/rpcclient, + web3/[eth_api, eth_api_types], eth/common/[eth_types, eth_types_rlp], ../../../rpc/rpc_calls/rpc_trace_calls, ../portal_bridge_common @@ -98,15 +101,46 @@ proc toStateDiffs( stateDiffs +# proc getBlocksByNumber*( +# client: RpcClient, startBlock: uint64, batchSize: int +# ): Future[Result[seq[BlockObject], string]] {.async: (raises: []).} = +# let blck = +# try: +# let batch = client.prepareBatch() + +# for i in 0 ..< batchSize: +# batch.eth_getBlockByNumber(blockId(startBlock + uint64(i)), false) + +# let res = (await batch.send()).get() + +# var blockObjs = newSeqOfCap[BlockObject](batchSize) +# for i in 0 ..< batchSize: +# blockObjs.add(Json.decode(res[i].result.string, BlockObject)) + +# blockObjs +# except CatchableError as e: +# return err("EL JSON-RPC eth_getBlockByNumber failed: " & e.msg) + +# return ok(blck) + proc getStateDiffsByBlockNumber*( - client: RpcClient, blockId: BlockIdentifier -): Future[Result[seq[StateDiffRef], string]] {.async: (raises: []).} = + client: RpcClient, startBlock: uint64, batchSize: int +): Future[Result[seq[seq[StateDiffRef]], string]] {.async: (raises: []).} = const traceOpts = @["stateDiff"] try: - let blockTraceJson = await client.trace_replayBlockTransactions(blockId, traceOpts) - if blockTraceJson.isNil: - return err("EL failed to provide requested state diff") - ok(blockTraceJson.toStateDiffs()) + let batch = client.prepareBatch() + + for i in 0 ..< batchSize: + batch.trace_replayBlockTransactions(blockId(startBlock + uint64(i)), traceOpts) + + let res = (await batch.send()).get() + + var stateDiffsObjs = newSeqOfCap[seq[StateDiffRef]](batchSize) + + for i in 0 ..< batchSize: + stateDiffsObjs.add(Json.decode(res[i].result.string, JsonNode).toStateDiffs()) + + ok(stateDiffsObjs) except CatchableError as e: return err("EL JSON-RPC trace_replayBlockTransactions failed: " & e.msg)