feat: subscriptions handlers get results

This commit is contained in:
Adam Uhlíř 2024-10-31 10:12:04 +01:00
parent d60cedbb98
commit 5a1bab5a19
No known key found for this signature in database
GPG Key ID: 1D17A9E81F76155B
11 changed files with 136 additions and 61 deletions

View File

@ -131,14 +131,20 @@ You can now subscribe to Transfer events by calling `subscribe` on the contract
instance.
```nim
proc handleTransfer(transfer: Transfer) =
echo "received transfer: ", transfer
proc handleTransfer(transferResult: ?!Transfer) =
if transferResult.isOk:
echo "received transfer: ", transferResult.value
let subscription = await token.subscribe(Transfer, handleTransfer)
```
When a Transfer event is emitted, the `handleTransfer` proc that you just
defined will be called.
defined will be called with a [Result](https://github.com/arnetheduck/nim-results) type
which contains the event value.
In case there is some underlying error in the event subscription, the handler will
be called as well, but the Result will contain error instead, so do proper error
management in your handlers.
When you're no longer interested in these events, you can unsubscribe:

View File

@ -3,6 +3,7 @@ import std/macros
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/contractabi
import ./basics
import ./provider
@ -35,11 +36,10 @@ type
gasLimit*: ?UInt256
CallOverrides* = ref object of TransactionOverrides
blockTag*: ?BlockTag
ContractError* = object of EthersError
Confirmable* = object
response*: ?TransactionResponse
convert*: ConvertCustomErrors
EventHandler*[E: Event] = proc(event: E) {.gcsafe, raises:[].}
EventHandler*[E: Event] = proc(event: ?!E) {.gcsafe, raises:[].}
func new*(ContractType: type Contract,
address: Address,
@ -292,9 +292,13 @@ proc subscribe*[E: Event](contract: Contract,
let topic = topic($E, E.fieldTypes).toArray
let filter = EventFilter(address: contract.address, topics: @[topic])
proc logHandler(log: Log) {.raises: [].} =
proc logHandler(logResult: ?!Log) {.raises: [].} =
without log =? logResult, err:
handler(failure(E, err))
return
if event =? E.decode(log.data, log.topics):
handler(event)
handler(success(event))
contract.provider.subscribe(filter, logHandler)

View File

@ -1,7 +1,23 @@
import ./basics
type SolidityError* = object of EthersError
type
SolidityError* = object of EthersError
ContractError* = object of EthersError
SignerError* = object of EthersError
SubscriptionError* = object of EthersError
ProviderError* = object of EthersError
data*: ?seq[byte]
template raiseSignerError*(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
{.push raises:[].}
proc toErr*[E1: ref CatchableError, E2: EthersError](
e1: E1,
_: type E2,
msg: string = e1.msg): ref E2 =
return newException(E2, msg, e1)
template errors*(types) {.pragma.}

View File

@ -1,5 +1,6 @@
import pkg/chronicles
import pkg/serde
import pkg/questionable
import ./basics
import ./transaction
import ./blocktag
@ -8,13 +9,12 @@ import ./errors
export basics
export transaction
export blocktag
export errors
{.push raises: [].}
type
Provider* = ref object of RootObj
ProviderError* = object of EthersError
data*: ?seq[byte]
EstimateGasError* = object of ProviderError
transaction*: Transaction
Subscription* = ref object of RootObj
@ -56,8 +56,8 @@ type
effectiveGasPrice*: ?UInt256
status*: TransactionStatus
transactionType* {.serialize("type"), deserialize("type").}: TransactionType
LogHandler* = proc(log: Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: Block) {.gcsafe, raises:[].}
LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].}
Topic* = array[32, byte]
Block* {.serialize.} = object
number*: ?UInt256
@ -227,7 +227,7 @@ proc confirm*(
tx: TransactionResponse,
confirmations = EthersDefaultConfirmations,
timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt]
{.async: (raises: [CancelledError, ProviderError, EthersError]).} =
{.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} =
## Waits for a transaction to be mined and for the specified number of blocks
## to pass since it was mined (confirmations).
@ -235,13 +235,25 @@ proc confirm*(
## blocks have passed without the tx having been mined.
var blockNumber: UInt256
var blockSubscriptionError: ref SubscriptionError
let blockEvent = newAsyncEvent()
proc onBlockNumber(number: UInt256) =
blockNumber = number
blockEvent.fire()
proc onBlock(blck: Block) =
proc onBlock(blckResult: ?!Block) =
without blck =? blckResult, error:
let err = blckResult.error()
if err of SubscriptionError:
blockSubscriptionError = cast[ref SubscriptionError](err)
else:
echo "What to do now? 😳"
blockEvent.fire()
return
if number =? blck.number:
onBlockNumber(number)
@ -255,6 +267,9 @@ proc confirm*(
await blockEvent.wait()
blockEvent.clear()
if not isNil(blockSubscriptionError):
raise blockSubscriptionError
if blockNumber >= finish:
await subscription.unsubscribe()
raise newException(EthersError, "tx not mined before timeout")

View File

@ -1,9 +1,12 @@
import std/strutils
import pkg/stew/byteutils
import ../../basics
import ../../errors
import ../../provider
import ./conversions
export errors
{.push raises:[].}
type JsonRpcProviderError* = object of ProviderError

View File

@ -2,9 +2,11 @@ import std/tables
import std/sequtils
import std/strutils
import pkg/chronos
import pkg/questionable
import pkg/json_rpc/rpcclient
import pkg/serde
import ../../basics
import ../../errors
import ../../provider
include ../../nimshims/hashes
import ./rpccalls
@ -19,8 +21,7 @@ type
callbacks: Table[JsonNode, SubscriptionCallback]
methodHandlers: Table[string, MethodHandler]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].}
SubscriptionError* = object of EthersError
SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].}
{.push raises:[].}
@ -56,7 +57,7 @@ proc setMethodHandler(
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async, base.} =
{.async, base, raises: [CancelledError].} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
@ -77,14 +78,16 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
await subscriptions.unsubscribe(id)
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback =
id: JsonNode): ?SubscriptionCallback {. raises:[].} =
try:
if not id.isNil and id in subscriptions.callbacks:
subscriptions.callbacks[id].some
try:
return subscriptions.callbacks[id].some
except: discard
else:
SubscriptionCallback.none
return SubscriptionCallback.none
except KeyError:
SubscriptionCallback.none
return SubscriptionCallback.none
# Web sockets
@ -98,17 +101,22 @@ proc new*(_: type JsonRpcSubscriptions,
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} =
let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id):
callback(id, arguments)
callback(id, success(arguments))
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) {.raises: [].} =
{.async, raises: [].} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} =
without arguments =? argumentsResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError)))
return
if blck =? Block.fromJson(arguments{"result"}):
onBlock(blck)
onBlock(success(blck))
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return id
@ -118,9 +126,14 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
onLog: LogHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
if log =? Log.fromJson(arguments{"result"}):
onLog(log)
onLog(success(log))
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
return id
@ -152,7 +165,7 @@ proc new*(_: type JsonRpcSubscriptions,
let subscriptions = PollingSubscriptions(client: client)
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} =
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async, raises:[CancelledError, SubscriptionError].} =
try:
let mappedId = subscriptions.subscriptionMapping[originalId]
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
@ -172,12 +185,19 @@ proc new*(_: type JsonRpcSubscriptions,
subscriptions.subscriptionMapping[originalId] = newId
return await getChanges(originalId)
else:
raise e
raise newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg, e)
proc poll(id: JsonNode) {.async.} =
for change in await getChanges(id):
if callback =? subscriptions.getCallback(id):
callback(id, change)
proc poll(id: JsonNode) {.async: (raises: [CancelledError, SubscriptionError]).} =
without callback =? subscriptions.getCallback(id):
return
try:
for change in await getChanges(id):
callback(id, success(change))
except CancelledError as e:
raise e
except CatchableError as e:
callback(id, failure(JsonNode, e))
proc poll {.async.} =
untilCancelled:
@ -195,16 +215,23 @@ method close*(subscriptions: PollingSubscriptions) {.async.} =
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
{.async, raises:[CancelledError].} =
proc getBlock(hash: BlockHash) {.async.} =
proc getBlock(hash: BlockHash) {.async: (raises:[]).} =
try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(blck)
except CatchableError:
onBlock(success(blck))
except CancelledError as e:
discard
except CatchableError as e:
let wrappedErr = newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg, e)
onBlock(failure(Block, wrappedErr))
proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} =
without change =? changeResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError)))
return
proc callback(id, change: JsonNode) =
if hash =? BlockHash.fromJson(change):
asyncSpawn getBlock(hash)
@ -219,9 +246,13 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
Future[JsonNode]
{.async.} =
proc callback(id, change: JsonNode) =
proc callback(id: JsonNode, changeResult: ?!JsonNode) =
without change =? changeResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
if log =? Log.fromJson(change):
onLog(log)
onLog(success(log))
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback

View File

@ -1,18 +1,16 @@
import pkg/questionable
import ./basics
import ./errors
import ./provider
export basics
export errors
{.push raises: [].}
type
Signer* = ref object of RootObj
populateLock: AsyncLock
SignerError* = object of EthersError
template raiseSignerError(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
template convertError(body) =
try:

View File

@ -2,7 +2,6 @@ import ../../examples
import ../../../ethers/provider
import ../../../ethers/providers/jsonrpc/conversions
import std/tables
import std/sequtils
import pkg/stew/byteutils
import pkg/json_rpc/rpcserver except `%`, `%*`

View File

@ -49,7 +49,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
let oldBlock = !await provider.getBlock(BlockTag.latest)
discard await provider.send("evm_mine")
var newBlock: Block
let blockHandler = proc(blck: Block) = newBlock = blck
let blockHandler = proc(blck: ?!Block) {.raises:[].}= newBlock = blck.value
let subscription = await provider.subscribe(blockHandler)
discard await provider.send("evm_mine")
check eventually newBlock.number.isSome
@ -98,7 +98,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
expect JsonRpcProviderError:
discard await provider.getBlock(BlockTag.latest)
expect JsonRpcProviderError:
discard await provider.subscribe(proc(_: Block) = discard)
discard await provider.subscribe(proc(_: ?!Block) = discard)
expect JsonRpcProviderError:
discard await provider.getSigner().sendTransaction(Transaction.example)

View File

@ -1,4 +1,3 @@
import pkg/serde
import std/os
import std/sequtils
import std/importutils
@ -28,8 +27,8 @@ template subscriptionTests(subscriptions, client) =
test "subscribes to new blocks":
var latestBlock: Block
proc callback(blck: Block) =
latestBlock = blck
proc callback(blck: ?!Block) =
latestBlock = blck.value
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually latestBlock.number.isSome
@ -39,8 +38,9 @@ template subscriptionTests(subscriptions, client) =
test "stops listening to new blocks when unsubscribed":
var count = 0
proc callback(blck: Block) =
inc count
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
@ -52,8 +52,9 @@ template subscriptionTests(subscriptions, client) =
test "stops listening to new blocks when provider is closed":
var count = 0
proc callback(blck: Block) =
inc count
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
discard await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
@ -126,7 +127,7 @@ suite "HTTP polling subscriptions - filter not found":
test "filter not found error recreates filter":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
check subscriptions.filters.len == 0
check subscriptions.subscriptionMapping.len == 0
@ -144,7 +145,7 @@ suite "HTTP polling subscriptions - filter not found":
test "recreated filter can be still unsubscribed using the original id":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id

View File

@ -149,8 +149,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
test "receives events when subscribed":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) =
transfers.add(transfer)
proc handleTransfer(transferRes: ?!Transfer) =
if transfer =? transferRes:
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])
let signer1 = provider.getSigner(accounts[1])
@ -171,8 +172,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
test "stops receiving events when unsubscribed":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) =
transfers.add(transfer)
proc handleTransfer(transferRes: ?!Transfer) =
if transfer =? transferRes:
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])
@ -312,4 +314,4 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
check receipt2.status == TransactionStatus.Success
let balanceAfter = await token.myBalance()
check balanceAfter == balanceBefore + 200.u256
check balanceAfter == balanceBefore + 200.u256