Added `web3.subscribeForBlockHeaders`; Stricter error handling

This commit is contained in:
Zahary Karadjov 2020-06-26 18:30:55 +03:00
parent 694ff2ad74
commit 4590f21d5d
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
2 changed files with 106 additions and 40 deletions

105
web3.nim
View File

@ -1,10 +1,12 @@
import macros, strutils, options, math, json, tables, uri, strformat
from os import DirSep
import import
nimcrypto, stint, httputils, chronicles, chronos, json_rpc/rpcclient, macros, strutils, options, math, json, tables, uri, strformat
stew/byteutils, eth/keys
import web3/[ethtypes, ethprocs, conversions, ethhexstrings, transaction_signing] from os import DirSep
import
nimcrypto, stint, httputils, chronicles, chronos,
json_rpc/[rpcclient, jsonmarshal], stew/byteutils, eth/keys,
web3/[ethtypes, ethprocs, conversions, ethhexstrings, transaction_signing]
template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0] template sourceDir: string = currentSourcePath.rsplit(DirSep, 1)[0]
@ -28,10 +30,16 @@ type
EncodeResult* = tuple[dynamic: bool, data: string] EncodeResult* = tuple[dynamic: bool, data: string]
SubscriptionEventHandler* = proc (j: JsonNode) {.gcsafe, raises: [Defect].}
SubscriptionErrorHandler* = proc (err: CatchableError) {.gcsafe, raises: [Defect].}
BlockHeaderHandler* = proc (b: BlockHeader) {.gcsafe, raises: [Defect].}
Subscription* = ref object Subscription* = ref object
id*: string id*: string
web3*: Web3 web3*: Web3
callback*: proc(j: JsonNode) {.gcsafe.} eventHandler*: SubscriptionEventHandler
errorHandler*: SubscriptionErrorHandler
pendingEvents: seq[JsonNode] pendingEvents: seq[JsonNode]
historicalEventsProcessed: bool historicalEventsProcessed: bool
removed: bool removed: bool
@ -48,11 +56,7 @@ proc handleSubscriptionNotification(w: Web3, j: JsonNode) =
let s = w.subscriptions.getOrDefault(j{"subscription"}.getStr()) let s = w.subscriptions.getOrDefault(j{"subscription"}.getStr())
if not s.isNil and not s.removed: if not s.isNil and not s.removed:
if s.historicalEventsProcessed: if s.historicalEventsProcessed:
try: s.eventHandler(j{"result"})
s.callback(j{"result"})
except CatchableError as e:
echo "Caught exception in handleSubscriptionNotification: ", e.msg
echo e.getStackTrace()
else: else:
s.pendingEvents.add(j) s.pendingEvents.add(j)
@ -91,29 +95,61 @@ proc getHistoricalEvents(s: Subscription, options: JsonNode) {.async.} =
let logs = await s.web3.provider.eth_getLogs(options) let logs = await s.web3.provider.eth_getLogs(options)
for l in logs: for l in logs:
if s.removed: break if s.removed: break
s.callback(l) s.eventHandler(l)
s.historicalEventsProcessed = true s.historicalEventsProcessed = true
var i = 0 var i = 0
while i < s.pendingEvents.len: # Mind reentrancy while i < s.pendingEvents.len: # Mind reentrancy
if s.removed: break if s.removed: break
s.callback(s.pendingEvents[i]) s.eventHandler(s.pendingEvents[i])
inc i inc i
s.pendingEvents = @[] s.pendingEvents = @[]
except CatchableError as e: except CatchableError as e:
echo "Caught exception in getHistoricalEvents: ", e.msg echo "Caught exception in getHistoricalEvents: ", e.msg
echo e.getStackTrace() echo e.getStackTrace()
proc subscribe*(w: Web3, name: string, options: JsonNode, callback: proc(j: JsonNode) {.gcsafe.}): Future[Subscription] {.async.} = proc subscribe*(w: Web3, name: string, options: JsonNode,
var options = options eventHandler: SubscriptionEventHandler,
if options.isNil: options = newJNull() errorHandler: SubscriptionErrorHandler): Future[Subscription]
let id = await w.provider.eth_subscribe(name, options) {.async.} =
result = Subscription(id: id, web3: w, callback: callback) ## Sets up a new subsciption using the `eth_subscribe` RPC call.
##
## May raise a `CatchableError` if the subscription is not established.
##
## Once the subscription is established, the `eventHandler` callback
## will be executed for each event of interest.
##
## In case of any errors or illegal behavior of the remote RPC node,
## the `errorHandler` will be executed with relevant information about
## the error.
let id = await w.provider.eth_subscribe(name, if options.isNil: newJNull()
else: options)
result = Subscription(id: id,
web3: w,
eventHandler: eventHandler,
errorHandler: errorHandler)
w.subscriptions[id] = result w.subscriptions[id] = result
proc subscribeToLogs*(w: Web3, options: JsonNode, callback: proc(j: JsonNode) {.gcsafe.}): Future[Subscription] {.async.} = proc subscribeForLogs*(w: Web3, options: JsonNode,
result = await subscribe(w, "logs", options, callback) logsHandler: SubscriptionEventHandler,
errorHandler: SubscriptionErrorHandler): Future[Subscription]
{.async.} =
result = await subscribe(w, "logs", options, logsHandler, errorHandler)
discard getHistoricalEvents(result, options) discard getHistoricalEvents(result, options)
proc subscribeForBlockHeaders*(w: Web3, options: JsonNode,
blockHeadersCallback: proc(b: BlockHeader) {.gcsafe, raises: [Defect].},
errorHandler: SubscriptionErrorHandler): Future[Subscription]
{.async.} =
proc eventHandler(json: JsonNode) {.gcsafe, raises: [Defect].} =
var blk: BlockHeader
try: fromJson(json, "result", blk)
except CatchableError as err: errorHandler(err[])
blockHeadersCallback(blk)
result = await subscribe(w, "newHeads", options, eventHandler, errorHandler)
result.historicalEventsProcessed = true
proc unsubscribe*(s: Subscription): Future[void] {.async.} = proc unsubscribe*(s: Subscription): Future[void] {.async.} =
s.web3.subscriptions.del(s.id) s.web3.subscriptions.del(s.id)
s.removed = true s.removed = true
@ -680,22 +716,35 @@ macro contract*(cname: untyped, body: untyped): untyped =
proc subscribe(s: Sender[`cname`], proc subscribe(s: Sender[`cname`],
t: type `cbident`, t: type `cbident`,
options: JsonNode, options: JsonNode,
`callbackIdent`: `procTy`): Future[Subscription] = `callbackIdent`: `procTy`,
errorHandler: SubscriptionErrorHandler): Future[Subscription] =
let options = addAddressAndSignatureToOptions(options, s.contractAddress, eventTopic(`cbident`)) let options = addAddressAndSignatureToOptions(options, s.contractAddress, eventTopic(`cbident`))
s.web3.subscribeToLogs(options) do(`jsonIdent`: JsonNode): proc eventHandler(`jsonIdent`: JsonNode) {.gcsafe, raises: [Defect].} =
`argParseBody` try:
`call` `argParseBody`
`call`
except CatchableError as err:
errorHandler err[]
s.web3.subscribeForLogs(options, eventHandler, errorHandler)
proc subscribe(s: Sender[`cname`], proc subscribe(s: Sender[`cname`],
t: type `cbident`, t: type `cbident`,
options: JsonNode, options: JsonNode,
`callbackIdent`: `procTyWithRawData`): Future[Subscription] = `callbackIdent`: `procTyWithRawData`,
errorHandler: SubscriptionErrorHandler): Future[Subscription] =
let options = addAddressAndSignatureToOptions(options, s.contractAddress, eventTopic(`cbident`)) let options = addAddressAndSignatureToOptions(options, s.contractAddress, eventTopic(`cbident`))
s.web3.subscribeToLogs(options) do(`jsonIdent`: JsonNode): proc eventHandler(`jsonIdent`: JsonNode) {.gcsafe, raises: [Defect].} =
`argParseBody` try:
`callWithRawData` `argParseBody`
`callWithRawData`
except CatchableError as err:
errorHandler err[]
s.web3.subscribeForLogs(options, eventHandler, errorHandler)
else: else:
discard discard

View File

@ -62,26 +62,43 @@ type
# value*: int # (optional) Integer of the value sent with this transaction. # value*: int # (optional) Integer of the value sent with this transaction.
# data*: int # (optional) Hash of the method signature and encoded parameters. For details see Ethereum Contract ABI. # data*: int # (optional) Hash of the method signature and encoded parameters. For details see Ethereum Contract ABI.
## A block header object
BlockHeader* = ref object
difficulty*: Quantity
extraData*: string
gasLimit*: Quantity
gasUsed*: Quantity
logsBloom*: FixedBytes[256]
miner*: Address
nonce*: Quantity
number*: Quantity
parentHash*: BlockHash
receiptsRoot*: BlockHash
sha3Uncles*: BlockHash
stateRoot*: BlockHash
timestamp*: Quantity
transactionsRoot*: BlockHash
## A block object, or null when no block was found ## A block object, or null when no block was found
BlockObject* = ref object BlockObject* = ref object
number*: Quantity # the block number. null when its pending block.
hash*: BlockHash # hash of the block. null when its pending block.
parentHash*: BlockHash # hash of the parent block.
nonce*: Quantity # hash of the generated proof-of-work. null when its pending block.
sha3Uncles*: UInt256 # SHA3 of the uncles data in the block.
logsBloom*: FixedBytes[256] # the bloom filter for the logs of the block. null when its pending block.
transactionsRoot*: UInt256 # the root of the transaction trie of the block.
stateRoot*: UInt256 # the root of the final state trie of the block.
receiptsRoot*: UInt256 # the root of the receipts trie of the block.
miner*: Address # the address of the beneficiary to whom the mining rewards were given.
difficulty*: Quantity # integer of the difficulty for this block. difficulty*: Quantity # integer of the difficulty for this block.
totalDifficulty*: Quantity # integer of the total difficulty of the chain until this block.
extraData*: string # the "extra data" field of this block. extraData*: string # the "extra data" field of this block.
size*: Quantity # integer the size of this block in bytes.
gasLimit*: Quantity # the maximum gas allowed in this block. gasLimit*: Quantity # the maximum gas allowed in this block.
gasUsed*: Quantity # the total used gas by all transactions in this block. gasUsed*: Quantity # the total used gas by all transactions in this block.
hash*: BlockHash # hash of the block. null when its pending block.
logsBloom*: FixedBytes[256] # the bloom filter for the logs of the block. null when its pending block.
miner*: Address # the address of the beneficiary to whom the mining rewards were given.
nonce*: Quantity # hash of the generated proof-of-work. null when its pending block.
number*: Quantity # the block number. null when its pending block.
parentHash*: BlockHash # hash of the parent block.
receiptsRoot*: UInt256 # the root of the receipts trie of the block.
sha3Uncles*: UInt256 # SHA3 of the uncles data in the block.
size*: Quantity # integer the size of this block in bytes.
stateRoot*: UInt256 # the root of the final state trie of the block.
timestamp*: Quantity # the unix timestamp for when the block was collated. timestamp*: Quantity # the unix timestamp for when the block was collated.
totalDifficulty*: Quantity # integer of the total difficulty of the chain until this block.
transactions*: seq[TxHash] # list of transaction objects, or 32 Bytes transaction hashes depending on the last given parameter. transactions*: seq[TxHash] # list of transaction objects, or 32 Bytes transaction hashes depending on the last given parameter.
transactionsRoot*: UInt256 # the root of the transaction trie of the block.
uncles*: seq[BlockHash] # list of uncle hashes. uncles*: seq[BlockHash] # list of uncle hashes.
TransactionObject* = object # A transaction object, or null when no transaction was found: TransactionObject* = object # A transaction object, or null when no transaction was found: