feat: subscriptions handlers get results

This commit is contained in:
Adam Uhlíř 2024-10-31 10:12:04 +01:00
parent 1ae2cd4a35
commit 5ec16dbf8d
No known key found for this signature in database
GPG Key ID: 1D17A9E81F76155B
10 changed files with 150 additions and 69 deletions

View File

@ -131,14 +131,20 @@ You can now subscribe to Transfer events by calling `subscribe` on the contract
instance.
```nim
proc handleTransfer(transfer: Transfer) =
echo "received transfer: ", transfer
proc handleTransfer(transferResult: ?!Transfer) =
if transferResult.isOk:
echo "received transfer: ", transferResult.value
let subscription = await token.subscribe(Transfer, handleTransfer)
```
When a Transfer event is emitted, the `handleTransfer` proc that you just
defined will be called.
defined will be called with a [Result](https://github.com/arnetheduck/nim-results) type
which contains the event value.
In case there is some underlying error in the event subscription, the handler will
be called as well, but the Result will contain error instead, so do proper error
management in your handlers.
When you're no longer interested in these events, you can unsubscribe:

View File

@ -3,6 +3,7 @@ import std/macros
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/contractabi
import ./basics
import ./provider
@ -35,11 +36,10 @@ type
gasLimit*: ?UInt256
CallOverrides* = ref object of TransactionOverrides
blockTag*: ?BlockTag
ContractError* = object of EthersError
Confirmable* = object
response*: ?TransactionResponse
convert*: ConvertCustomErrors
EventHandler*[E: Event] = proc(event: E) {.gcsafe, raises:[].}
EventHandler*[E: Event] = proc(event: ?!E) {.gcsafe, raises:[].}
func new*(ContractType: type Contract,
address: Address,
@ -292,9 +292,13 @@ proc subscribe*[E: Event](contract: Contract,
let topic = topic($E, E.fieldTypes).toArray
let filter = EventFilter(address: contract.address, topics: @[topic])
proc logHandler(log: Log) {.raises: [].} =
proc logHandler(logResult: ?!Log) {.raises: [].} =
without log =? logResult, err:
handler(failure(E, err))
return
if event =? E.decode(log.data, log.topics):
handler(event)
handler(success(event))
contract.provider.subscribe(filter, logHandler)

View File

@ -1,7 +1,23 @@
import ./basics
type SolidityError* = object of EthersError
type
SolidityError* = object of EthersError
ContractError* = object of EthersError
SignerError* = object of EthersError
SubscriptionError* = object of EthersError
ProviderError* = object of EthersError
data*: ?seq[byte]
template raiseSignerError*(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
{.push raises:[].}
proc toErr*[E1: ref CatchableError, E2: EthersError](
e1: E1,
_: type E2,
msg: string = e1.msg): ref E2 =
return newException(E2, msg, e1)
template errors*(types) {.pragma.}

View File

@ -1,5 +1,6 @@
import pkg/chronicles
import pkg/serde
import pkg/questionable
import ./basics
import ./transaction
import ./blocktag
@ -8,13 +9,12 @@ import ./errors
export basics
export transaction
export blocktag
export errors
{.push raises: [].}
type
Provider* = ref object of RootObj
ProviderError* = object of EthersError
data*: ?seq[byte]
EstimateGasError* = object of ProviderError
transaction*: Transaction
Subscription* = ref object of RootObj
@ -56,8 +56,8 @@ type
effectiveGasPrice*: ?UInt256
status*: TransactionStatus
transactionType* {.serialize("type"), deserialize("type").}: TransactionType
LogHandler* = proc(log: Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: Block) {.gcsafe, raises:[].}
LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].}
Topic* = array[32, byte]
Block* {.serialize.} = object
number*: ?UInt256
@ -227,7 +227,7 @@ proc confirm*(
tx: TransactionResponse,
confirmations = EthersDefaultConfirmations,
timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt]
{.async: (raises: [CancelledError, ProviderError, EthersError]).} =
{.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} =
## Waits for a transaction to be mined and for the specified number of blocks
## to pass since it was mined (confirmations). The number of confirmations
@ -238,6 +238,7 @@ proc confirm*(
assert confirmations > 0
var blockNumber: UInt256
var blockSubscriptionError: ref SubscriptionError
let blockEvent = newAsyncEvent()
proc updateBlockNumber {.async: (raises: []).} =
@ -250,7 +251,18 @@ proc confirm*(
# there's nothing we can do here
discard
proc onBlock(_: Block) =
proc onBlock(blckResult: ?!Block) =
without blck =? blckResult, error:
let err = blckResult.error()
if err of SubscriptionError:
blockSubscriptionError = cast[ref SubscriptionError](err)
else:
echo "What to do now? 😳"
blockEvent.fire()
return
# ignore block parameter; hardhat may call this with pending blocks
asyncSpawn updateBlockNumber()
@ -264,6 +276,9 @@ proc confirm*(
await blockEvent.wait()
blockEvent.clear()
if not isNil(blockSubscriptionError):
raise blockSubscriptionError
if blockNumber >= finish:
await subscription.unsubscribe()
raise newException(EthersError, "tx not mined before timeout")

View File

@ -1,9 +1,12 @@
import std/strutils
import pkg/stew/byteutils
import ../../basics
import ../../errors
import ../../provider
import ./conversions
export errors
{.push raises:[].}
type JsonRpcProviderError* = object of ProviderError

View File

@ -1,9 +1,12 @@
import std/tables
import std/sequtils
import std/strutils
import pkg/chronos
import pkg/questionable
import pkg/json_rpc/rpcclient
import pkg/serde
import ../../basics
import ../../errors
import ../../provider
include ../../nimshims/hashes
import ./rpccalls
@ -17,8 +20,7 @@ type
callbacks: Table[JsonNode, SubscriptionCallback]
methodHandlers: Table[string, MethodHandler]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].}
SubscriptionError* = object of EthersError
SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].}
{.push raises:[].}
@ -54,7 +56,7 @@ proc setMethodHandler(
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async, base.} =
{.async, base, raises: [CancelledError].} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
@ -75,14 +77,16 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
await subscriptions.unsubscribe(id)
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback =
id: JsonNode): ?SubscriptionCallback {. raises:[].} =
try:
if not id.isNil and id in subscriptions.callbacks:
subscriptions.callbacks[id].some
try:
return subscriptions.callbacks[id].some
except: discard
else:
SubscriptionCallback.none
return SubscriptionCallback.none
except KeyError:
SubscriptionCallback.none
return SubscriptionCallback.none
# Web sockets
@ -96,17 +100,22 @@ proc new*(_: type JsonRpcSubscriptions,
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} =
let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id):
callback(id, arguments)
callback(id, success(arguments))
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) {.raises: [].} =
{.async, raises: [].} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} =
without arguments =? argumentsResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError)))
return
if blck =? Block.fromJson(arguments{"result"}):
onBlock(blck)
onBlock(success(blck))
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return id
@ -116,9 +125,14 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
onLog: LogHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
if log =? Log.fromJson(arguments{"result"}):
onLog(log)
onLog(success(log))
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
return id
@ -150,7 +164,7 @@ proc new*(_: type JsonRpcSubscriptions,
let subscriptions = PollingSubscriptions(client: client)
proc resubscribe(id: JsonNode) {.async: (raises: [CancelledError]).} =
proc resubscribe(id: JsonNode) {.async: (raises: [CancelledError, SubscriptionError]).} =
try:
var newId: JsonNode
# Log filters are stored in logFilters, block filters are not persisted
@ -164,34 +178,44 @@ proc new*(_: type JsonRpcSubscriptions,
subscriptions.subscriptionMapping[id] = newId
except CancelledError as error:
raise error
except CatchableError:
# there's nothing further we can do here
discard
except CatchableError as e:
raise newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg, e)
proc getChanges(id: JsonNode): Future[JsonNode] {.async: (raises: [CancelledError]).} =
proc getChanges(id: JsonNode): Future[JsonNode] {.async: (raises: [CancelledError, SubscriptionError]).} =
if mappedId =? subscriptions.subscriptionMapping.?[id]:
try:
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
if changes.kind == JArray:
return changes
except JsonRpcError:
except JsonRpcError as e:
await resubscribe(id)
# TODO: we could still miss some events between losing the subscription
# and resubscribing. We should probably adopt a strategy like ethers.js,
# whereby we keep track of the latest block number that we've seen
# filter changes for:
# https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977
except CancelledError as error:
raise error
except CatchableError:
# there's nothing we can do here
discard
if not ("filter not found" in e.msg):
raise newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg, e)
except CancelledError as e:
raise e
except SubscriptionError as e:
raise e
except CatchableError as e:
raise newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg, e)
return newJArray()
proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} =
for change in await getChanges(id):
if callback =? subscriptions.getCallback(id):
callback(id, change)
without callback =? subscriptions.getCallback(id):
return
try:
for change in await getChanges(id):
callback(id, success(change))
except CancelledError as e:
raise e
except CatchableError as e:
callback(id, failure(JsonNode, e))
proc poll {.async: (raises: []).} =
try:
@ -213,16 +237,23 @@ method close*(subscriptions: PollingSubscriptions) {.async.} =
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
{.async, raises:[CancelledError].} =
proc getBlock(hash: BlockHash) {.async.} =
proc getBlock(hash: BlockHash) {.async: (raises:[]).} =
try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(blck)
except CatchableError:
onBlock(success(blck))
except CancelledError as e:
discard
except CatchableError as e:
let wrappedErr = newException(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg, e)
onBlock(failure(Block, wrappedErr))
proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} =
without change =? changeResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError)))
return
proc callback(id, change: JsonNode) =
if hash =? BlockHash.fromJson(change):
asyncSpawn getBlock(hash)
@ -237,9 +268,13 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
Future[JsonNode]
{.async.} =
proc callback(id, change: JsonNode) =
proc callback(id: JsonNode, changeResult: ?!JsonNode) =
without change =? changeResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
if log =? Log.fromJson(change):
onLog(log)
onLog(success(log))
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback

View File

@ -1,18 +1,16 @@
import pkg/questionable
import ./basics
import ./errors
import ./provider
export basics
export errors
{.push raises: [].}
type
Signer* = ref object of RootObj
populateLock: AsyncLock
SignerError* = object of EthersError
template raiseSignerError(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
template convertError(body) =
try:

View File

@ -49,7 +49,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
let oldBlock = !await provider.getBlock(BlockTag.latest)
discard await provider.send("evm_mine")
var newBlock: Block
let blockHandler = proc(blck: Block) = newBlock = blck
let blockHandler = proc(blck: ?!Block) {.raises:[].}= newBlock = blck.value
let subscription = await provider.subscribe(blockHandler)
discard await provider.send("evm_mine")
check eventually newBlock.number.isSome
@ -98,7 +98,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
expect JsonRpcProviderError:
discard await provider.getBlock(BlockTag.latest)
expect JsonRpcProviderError:
discard await provider.subscribe(proc(_: Block) = discard)
discard await provider.subscribe(proc(_: ?!Block) = discard)
expect JsonRpcProviderError:
discard await provider.getSigner().sendTransaction(Transaction.example)

View File

@ -27,8 +27,8 @@ template subscriptionTests(subscriptions, client) =
test "subscribes to new blocks":
var latestBlock: Block
proc callback(blck: Block) =
latestBlock = blck
proc callback(blck: ?!Block) =
latestBlock = blck.value
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually latestBlock.number.isSome
@ -38,8 +38,9 @@ template subscriptionTests(subscriptions, client) =
test "stops listening to new blocks when unsubscribed":
var count = 0
proc callback(blck: Block) =
inc count
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
@ -51,8 +52,9 @@ template subscriptionTests(subscriptions, client) =
test "stops listening to new blocks when provider is closed":
var count = 0
proc callback(blck: Block) =
inc count
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
discard await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually count > 0
@ -130,7 +132,7 @@ suite "HTTP polling subscriptions - filter not found":
test "filter not found error recreates log filter":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
check subscriptions.logFilters.len == 0
check subscriptions.subscriptionMapping.len == 0
@ -148,7 +150,7 @@ suite "HTTP polling subscriptions - filter not found":
test "recreated log filter can be still unsubscribed using the original id":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
@ -159,7 +161,7 @@ suite "HTTP polling subscriptions - filter not found":
check not subscriptions.subscriptionMapping.hasKey id
test "filter not found error recreates block filter":
let emptyHandler = proc(blck: Block) = discard
let emptyHandler = proc(blck: ?!Block) = discard
check subscriptions.subscriptionMapping.len == 0
let id = await subscriptions.subscribeBlocks(emptyHandler)
@ -170,7 +172,7 @@ suite "HTTP polling subscriptions - filter not found":
check eventually subscriptions.subscriptionMapping[id] != id
test "recreated block filter can be still unsubscribed using the original id":
let emptyHandler = proc(blck: Block) = discard
let emptyHandler = proc(blck: ?!Block) = discard
let id = await subscriptions.subscribeBlocks(emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
@ -181,7 +183,7 @@ suite "HTTP polling subscriptions - filter not found":
test "polling continues with new filter after temporary error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)

View File

@ -149,8 +149,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
test "receives events when subscribed":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) =
transfers.add(transfer)
proc handleTransfer(transferRes: ?!Transfer) =
if transfer =? transferRes:
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])
let signer1 = provider.getSigner(accounts[1])
@ -171,8 +172,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
test "stops receiving events when unsubscribed":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) =
transfers.add(transfer)
proc handleTransfer(transferRes: ?!Transfer) =
if transfer =? transferRes:
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])