diff --git a/fluffy/tools/eth_data_exporter.nim b/fluffy/tools/eth_data_exporter.nim index cfa3b8c04..6a036cc65 100644 --- a/fluffy/tools/eth_data_exporter.nim +++ b/fluffy/tools/eth_data_exporter.nim @@ -36,6 +36,7 @@ import faststreams, chronicles, eth/[common, rlp], chronos, eth/common/eth_types_json_serialization, + json_rpc/rpcclient, ../seed_db, ../../premix/downloader, ../network/history/history_content @@ -113,12 +114,10 @@ proc writeBlock(writer: var JsonWriter, blck: Block) {.raises: [IOError, Defect] writer.writeField(headerHash, dataRecord) -proc downloadBlock(i: uint64): Block = +proc downloadBlock(i: uint64, client: RpcClient): Block = let num = u256(i) try: - # premix has hardcoded making request to local host which is "127.0.0.1:8545" - # which is defult port of geth json rpc server - return requestBlock(num, flags = {DownloadReceipts}) + return requestBlock(num, flags = {DownloadReceipts}, client = some(client)) except CatchableError as e: fatal "Error while requesting Block", error = e.msg, number = i quit 1 @@ -154,14 +153,14 @@ proc createAndOpenFile(config: ExporterConf): OutputStreamHandle = fatal "Error occurred while opening the file", error = e.msg quit 1 -proc writeToJson(config: ExporterConf) = +proc writeToJson(config: ExporterConf, client: RpcClient) = let fh = createAndOpenFile(config) try: var writer = JsonWriter[DefaultFlavor].init(fh.s) writer.beginRecord() for i in config.initialBlock..config.endBlock: - let blck = downloadBlock(i) + let blck = downloadBlock(i, client) writer.writeBlock(blck) writer.endRecord() info "File successfully written" @@ -175,14 +174,15 @@ proc writeToJson(config: ExporterConf) = fatal "Error occoured while closing file", error = e.msg quit 1 -proc writeToDb(config: ExporterConf) = +proc writeToDb(config: ExporterConf, client: RpcClient) = let db = SeedDb.new(distinctBase(config.dataDir), config.filename) + defer: db.close() for i in config.initialBlock..config.endBlock: let - blck = downloadBlock(i) + blck = downloadBlock(i, client) blockHash = blck.header.blockHash() contentKeyType = BlockKey(chainId: 1, blockHash: blockHash) headerKey = encode(ContentKey(contentType: blockHeader, blockHeaderKey: contentKeyType)) @@ -200,12 +200,12 @@ proc writeToDb(config: ExporterConf) = info "Data successfuly written to db" -proc run(config: ExporterConf) = +proc run(config: ExporterConf, client: RpcClient) = case config.storageMode of Json: - writeToJson(config) + writeToJson(config, client) of Db: - writeToDb(config) + writeToDb(config, client) when isMainModule: {.pop.} @@ -220,4 +220,19 @@ when isMainModule: setLogLevel(config.logLevel) - run(config) + var client: RpcClient + + try: + let c = newRpcWebSocketClient() + # TODO Currently hardcoded to default geth ws address, at some point it may + # be moved to config + waitFor c.connect("ws://127.0.0.1:8546") + client = c + except CatchableError as e: + fatal "Error while connecting to data provider", error = e.msg + quit 1 + + try: + run(config, client) + finally: + waitFor client.close() diff --git a/premix/downloader.nim b/premix/downloader.nim index 4c0627aea..e9e42a20d 100644 --- a/premix/downloader.nim +++ b/premix/downloader.nim @@ -18,14 +18,23 @@ type DownloadTxTrace DownloadAndValidate -proc request*(methodName: string, params: JsonNode): JsonNode = - var client = newRpcHttpClient() - #client.httpMethod(MethodPost) - waitFor client.connect("127.0.0.1", Port(8545), false) - result = waitFor client.call(methodName, params) - waitFor client.close() +proc request*( + methodName: string, + params: JsonNode, + client: Option[RpcClient] = none[RpcClient]()): JsonNode = + if client.isSome(): + result = waitFor client.unsafeGet().call(methodName, params) + else: + var client = newRpcHttpClient() + #client.httpMethod(MethodPost) + waitFor client.connect("127.0.0.1", Port(8545), false) + result = waitFor client.call(methodName, params) + waitFor client.close() -proc requestBlockBody(n: JsonNode, blockNumber: BlockNumber): BlockBody = +proc requestBlockBody( + n: JsonNode, + blockNumber: BlockNumber, + client: Option[RpcClient] = none[RpcClient]()): BlockBody = let txs = n["transactions"] if txs.len > 0: result.transactions = newSeqOfCap[Transaction](txs.len) @@ -40,53 +49,62 @@ proc requestBlockBody(n: JsonNode, blockNumber: BlockNumber): BlockBody = let blockNumber = blockNumber.prefixHex for i in 0 ..< uncles.len: let idx = i.prefixHex - let uncle = request("eth_getUncleByBlockNumberAndIndex", %[%blockNumber, %idx]) + let uncle = request("eth_getUncleByBlockNumberAndIndex", %[%blockNumber, %idx], client) if uncle.kind == JNull: error "requested uncle not available", blockNumber=blockNumber, uncleIdx=i raise newException(ValueError, "Error when retrieving block uncles") result.uncles.add parseBlockHeader(uncle) -proc requestReceipts(n: JsonNode): seq[Receipt] = +proc requestReceipts( + n: JsonNode, + client: Option[RpcClient] = none[RpcClient]()): seq[Receipt] = let txs = n["transactions"] if txs.len > 0: result = newSeqOfCap[Receipt](txs.len) for tx in txs: let txHash = tx["hash"] - let rec = request("eth_getTransactionReceipt", %[txHash]) + let rec = request("eth_getTransactionReceipt", %[txHash], client) if rec.kind == JNull: error "requested receipt not available", txHash=txHash raise newException(ValueError, "Error when retrieving block receipts") result.add parseReceipt(rec) -proc requestTxTraces(n: JsonNode): JsonNode = +proc requestTxTraces( + n: JsonNode, + client: Option[RpcClient] = none[RpcClient]()): JsonNode = result = newJArray() let txs = n["transactions"] if txs.len == 0: return for tx in txs: let txHash = tx["hash"] - let txTrace = request("debug_traceTransaction", %[txHash]) + let txTrace = request("debug_traceTransaction", %[txHash], client) if txTrace.kind == JNull: error "requested trace not available", txHash=txHash raise newException(ValueError, "Error when retrieving transaction trace") result.add txTrace -proc requestHeader*(blockNumber: BlockNumber): JsonNode = - result = request("eth_getBlockByNumber", %[%blockNumber.prefixHex, %true]) +proc requestHeader*( + blockNumber: BlockNumber, + client: Option[RpcClient] = none[RpcClient]()): JsonNode = + result = request("eth_getBlockByNumber", %[%blockNumber.prefixHex, %true], client) if result.kind == JNull: error "requested block not available", blockNumber=blockNumber raise newException(ValueError, "Error when retrieving block header") -proc requestBlock*(blockNumber: BlockNumber, flags: set[DownloadFlags] = {}): Block = - let header = requestHeader(blockNumber) +proc requestBlock*( + blockNumber: BlockNumber, + flags: set[DownloadFlags] = {}, + client: Option[RpcClient] = none[RpcClient]()): Block = + let header = requestHeader(blockNumber, client) result.jsonData = header result.header = parseBlockHeader(header) - result.body = requestBlockBody(header, blockNumber) + result.body = requestBlockBody(header, blockNumber, client) if DownloadTxTrace in flags: - result.traces = requestTxTraces(header) + result.traces = requestTxTraces(header, client) if DownloadReceipts in flags: - result.receipts = requestReceipts(header) + result.receipts = requestReceipts(header, client) let receiptRoot = calcReceiptRoot(result.receipts).prefixHex receiptRootOK = result.header.receiptRoot.prefixHex