Use one ws connection to transfer data from source (#1136)

This commit is contained in:
KonradStaniec 2022-06-22 08:50:58 +02:00 committed by GitHub
parent 8dce76c49f
commit 196428dcf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 31 deletions

View File

@ -36,6 +36,7 @@ import
faststreams, chronicles,
eth/[common, rlp], chronos,
eth/common/eth_types_json_serialization,
json_rpc/rpcclient,
../seed_db,
../../premix/downloader,
../network/history/history_content
@ -113,12 +114,10 @@ proc writeBlock(writer: var JsonWriter, blck: Block) {.raises: [IOError, Defect]
writer.writeField(headerHash, dataRecord)
proc downloadBlock(i: uint64): Block =
proc downloadBlock(i: uint64, client: RpcClient): Block =
let num = u256(i)
try:
# premix has hardcoded making request to local host which is "127.0.0.1:8545"
# which is defult port of geth json rpc server
return requestBlock(num, flags = {DownloadReceipts})
return requestBlock(num, flags = {DownloadReceipts}, client = some(client))
except CatchableError as e:
fatal "Error while requesting Block", error = e.msg, number = i
quit 1
@ -154,14 +153,14 @@ proc createAndOpenFile(config: ExporterConf): OutputStreamHandle =
fatal "Error occurred while opening the file", error = e.msg
quit 1
proc writeToJson(config: ExporterConf) =
proc writeToJson(config: ExporterConf, client: RpcClient) =
let fh = createAndOpenFile(config)
try:
var writer = JsonWriter[DefaultFlavor].init(fh.s)
writer.beginRecord()
for i in config.initialBlock..config.endBlock:
let blck = downloadBlock(i)
let blck = downloadBlock(i, client)
writer.writeBlock(blck)
writer.endRecord()
info "File successfully written"
@ -175,14 +174,15 @@ proc writeToJson(config: ExporterConf) =
fatal "Error occoured while closing file", error = e.msg
quit 1
proc writeToDb(config: ExporterConf) =
proc writeToDb(config: ExporterConf, client: RpcClient) =
let db = SeedDb.new(distinctBase(config.dataDir), config.filename)
defer:
db.close()
for i in config.initialBlock..config.endBlock:
let
blck = downloadBlock(i)
blck = downloadBlock(i, client)
blockHash = blck.header.blockHash()
contentKeyType = BlockKey(chainId: 1, blockHash: blockHash)
headerKey = encode(ContentKey(contentType: blockHeader, blockHeaderKey: contentKeyType))
@ -200,12 +200,12 @@ proc writeToDb(config: ExporterConf) =
info "Data successfuly written to db"
proc run(config: ExporterConf) =
proc run(config: ExporterConf, client: RpcClient) =
case config.storageMode
of Json:
writeToJson(config)
writeToJson(config, client)
of Db:
writeToDb(config)
writeToDb(config, client)
when isMainModule:
{.pop.}
@ -220,4 +220,19 @@ when isMainModule:
setLogLevel(config.logLevel)
run(config)
var client: RpcClient
try:
let c = newRpcWebSocketClient()
# TODO Currently hardcoded to default geth ws address, at some point it may
# be moved to config
waitFor c.connect("ws://127.0.0.1:8546")
client = c
except CatchableError as e:
fatal "Error while connecting to data provider", error = e.msg
quit 1
try:
run(config, client)
finally:
waitFor client.close()

View File

@ -18,14 +18,23 @@ type
DownloadTxTrace
DownloadAndValidate
proc request*(methodName: string, params: JsonNode): JsonNode =
var client = newRpcHttpClient()
#client.httpMethod(MethodPost)
waitFor client.connect("127.0.0.1", Port(8545), false)
result = waitFor client.call(methodName, params)
waitFor client.close()
proc request*(
methodName: string,
params: JsonNode,
client: Option[RpcClient] = none[RpcClient]()): JsonNode =
if client.isSome():
result = waitFor client.unsafeGet().call(methodName, params)
else:
var client = newRpcHttpClient()
#client.httpMethod(MethodPost)
waitFor client.connect("127.0.0.1", Port(8545), false)
result = waitFor client.call(methodName, params)
waitFor client.close()
proc requestBlockBody(n: JsonNode, blockNumber: BlockNumber): BlockBody =
proc requestBlockBody(
n: JsonNode,
blockNumber: BlockNumber,
client: Option[RpcClient] = none[RpcClient]()): BlockBody =
let txs = n["transactions"]
if txs.len > 0:
result.transactions = newSeqOfCap[Transaction](txs.len)
@ -40,53 +49,62 @@ proc requestBlockBody(n: JsonNode, blockNumber: BlockNumber): BlockBody =
let blockNumber = blockNumber.prefixHex
for i in 0 ..< uncles.len:
let idx = i.prefixHex
let uncle = request("eth_getUncleByBlockNumberAndIndex", %[%blockNumber, %idx])
let uncle = request("eth_getUncleByBlockNumberAndIndex", %[%blockNumber, %idx], client)
if uncle.kind == JNull:
error "requested uncle not available", blockNumber=blockNumber, uncleIdx=i
raise newException(ValueError, "Error when retrieving block uncles")
result.uncles.add parseBlockHeader(uncle)
proc requestReceipts(n: JsonNode): seq[Receipt] =
proc requestReceipts(
n: JsonNode,
client: Option[RpcClient] = none[RpcClient]()): seq[Receipt] =
let txs = n["transactions"]
if txs.len > 0:
result = newSeqOfCap[Receipt](txs.len)
for tx in txs:
let txHash = tx["hash"]
let rec = request("eth_getTransactionReceipt", %[txHash])
let rec = request("eth_getTransactionReceipt", %[txHash], client)
if rec.kind == JNull:
error "requested receipt not available", txHash=txHash
raise newException(ValueError, "Error when retrieving block receipts")
result.add parseReceipt(rec)
proc requestTxTraces(n: JsonNode): JsonNode =
proc requestTxTraces(
n: JsonNode,
client: Option[RpcClient] = none[RpcClient]()): JsonNode =
result = newJArray()
let txs = n["transactions"]
if txs.len == 0: return
for tx in txs:
let txHash = tx["hash"]
let txTrace = request("debug_traceTransaction", %[txHash])
let txTrace = request("debug_traceTransaction", %[txHash], client)
if txTrace.kind == JNull:
error "requested trace not available", txHash=txHash
raise newException(ValueError, "Error when retrieving transaction trace")
result.add txTrace
proc requestHeader*(blockNumber: BlockNumber): JsonNode =
result = request("eth_getBlockByNumber", %[%blockNumber.prefixHex, %true])
proc requestHeader*(
blockNumber: BlockNumber,
client: Option[RpcClient] = none[RpcClient]()): JsonNode =
result = request("eth_getBlockByNumber", %[%blockNumber.prefixHex, %true], client)
if result.kind == JNull:
error "requested block not available", blockNumber=blockNumber
raise newException(ValueError, "Error when retrieving block header")
proc requestBlock*(blockNumber: BlockNumber, flags: set[DownloadFlags] = {}): Block =
let header = requestHeader(blockNumber)
proc requestBlock*(
blockNumber: BlockNumber,
flags: set[DownloadFlags] = {},
client: Option[RpcClient] = none[RpcClient]()): Block =
let header = requestHeader(blockNumber, client)
result.jsonData = header
result.header = parseBlockHeader(header)
result.body = requestBlockBody(header, blockNumber)
result.body = requestBlockBody(header, blockNumber, client)
if DownloadTxTrace in flags:
result.traces = requestTxTraces(header)
result.traces = requestTxTraces(header, client)
if DownloadReceipts in flags:
result.receipts = requestReceipts(header)
result.receipts = requestReceipts(header, client)
let
receiptRoot = calcReceiptRoot(result.receipts).prefixHex
receiptRootOK = result.header.receiptRoot.prefixHex