feat: subscriptions get passed result questionable (#91)

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Adam Uhlíř 2024-11-28 14:48:10 +01:00 committed by GitHub
parent 04c00e2d91
commit d88e4614b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 194 additions and 79 deletions

View File

@ -1,6 +1,11 @@
name: CI
on: [push, pull_request, workflow_dispatch]
on:
push:
branches:
- main
pull_request:
workflow_dispatch:
jobs:
test:
@ -12,6 +17,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Install Nim
uses: iffy/install-nim@v4

View File

@ -131,14 +131,22 @@ You can now subscribe to Transfer events by calling `subscribe` on the contract
instance.
```nim
proc handleTransfer(transfer: Transfer) =
echo "received transfer: ", transfer
proc handleTransfer(transferResult: ?!Transfer) =
if transferResult.isOk:
echo "received transfer: ", transferResult.value
else:
echo "error during transfer: ", transferResult.error.msg
let subscription = await token.subscribe(Transfer, handleTransfer)
```
When a Transfer event is emitted, the `handleTransfer` proc that you just
defined will be called.
defined will be called with a [Result](https://github.com/arnetheduck/nim-results) type
which contains the event value.
In case there is some underlying error in the event subscription, the handler will
be called as well, but the Result will contain error instead, so do proper error
management in your handlers.
When you're no longer interested in these events, you can unsubscribe:

View File

@ -3,6 +3,7 @@ import std/macros
import std/sequtils
import pkg/chronicles
import pkg/chronos
import pkg/questionable
import pkg/contractabi
import ./basics
import ./provider
@ -35,11 +36,10 @@ type
gasLimit*: ?UInt256
CallOverrides* = ref object of TransactionOverrides
blockTag*: ?BlockTag
ContractError* = object of EthersError
Confirmable* = object
response*: ?TransactionResponse
convert*: ConvertCustomErrors
EventHandler*[E: Event] = proc(event: E) {.gcsafe, raises:[].}
EventHandler*[E: Event] = proc(event: ?!E) {.gcsafe, raises:[].}
func new*(ContractType: type Contract,
address: Address,
@ -292,9 +292,13 @@ proc subscribe*[E: Event](contract: Contract,
let topic = topic($E, E.fieldTypes).toArray
let filter = EventFilter(address: contract.address, topics: @[topic])
proc logHandler(log: Log) {.raises: [].} =
proc logHandler(logResult: ?!Log) {.raises: [].} =
without log =? logResult, error:
handler(failure(E, error))
return
if event =? E.decode(log.data, log.topics):
handler(event)
handler(success(event))
contract.provider.subscribe(filter, logHandler)

View File

@ -1,7 +1,20 @@
import ./basics
type SolidityError* = object of EthersError
type
SolidityError* = object of EthersError
ContractError* = object of EthersError
SignerError* = object of EthersError
SubscriptionError* = object of EthersError
ProviderError* = object of EthersError
data*: ?seq[byte]
{.push raises:[].}
proc toErr*[E1: ref CatchableError, E2: EthersError](
e1: E1,
_: type E2,
msg: string = e1.msg): ref E2 =
return newException(E2, msg, e1)
template errors*(types) {.pragma.}

View File

@ -1,5 +1,6 @@
import pkg/chronicles
import pkg/serde
import pkg/questionable
import ./basics
import ./transaction
import ./blocktag
@ -8,13 +9,12 @@ import ./errors
export basics
export transaction
export blocktag
export errors
{.push raises: [].}
type
Provider* = ref object of RootObj
ProviderError* = object of EthersError
data*: ?seq[byte]
EstimateGasError* = object of ProviderError
transaction*: Transaction
Subscription* = ref object of RootObj
@ -56,8 +56,8 @@ type
effectiveGasPrice*: ?UInt256
status*: TransactionStatus
transactionType* {.serialize("type"), deserialize("type").}: TransactionType
LogHandler* = proc(log: Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: Block) {.gcsafe, raises:[].}
LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].}
BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].}
Topic* = array[32, byte]
Block* {.serialize.} = object
number*: ?UInt256
@ -227,7 +227,7 @@ proc confirm*(
tx: TransactionResponse,
confirmations = EthersDefaultConfirmations,
timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt]
{.async: (raises: [CancelledError, ProviderError, EthersError]).} =
{.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} =
## Waits for a transaction to be mined and for the specified number of blocks
## to pass since it was mined (confirmations). The number of confirmations
@ -238,6 +238,12 @@ proc confirm*(
assert confirmations > 0
var blockNumber: UInt256
## We need initialized succesfull Result, because the first iteration of the `while` loop
## bellow is triggered "manually" by calling `await updateBlockNumber` and not by block
## subscription. If left uninitialized then the Result is in error state and error is raised.
## This result is not used for block value, but for block subscription errors.
var blockSubscriptionResult: ?!Block = success(Block(number: UInt256.none, timestamp: 0.u256, hash: BlockHash.none))
let blockEvent = newAsyncEvent()
proc updateBlockNumber {.async: (raises: []).} =
@ -250,7 +256,13 @@ proc confirm*(
# there's nothing we can do here
discard
proc onBlock(_: Block) =
proc onBlock(blckResult: ?!Block) =
blockSubscriptionResult = blckResult
if blckResult.isErr:
blockEvent.fire()
return
# ignore block parameter; hardhat may call this with pending blocks
asyncSpawn updateBlockNumber()
@ -264,6 +276,16 @@ proc confirm*(
await blockEvent.wait()
blockEvent.clear()
if blockSubscriptionResult.isErr:
let error = blockSubscriptionResult.error()
if error of SubscriptionError:
raise (ref SubscriptionError)(error)
elif error of CancelledError:
raise (ref CancelledError)(error)
else:
raise error.toErr(ProviderError)
if blockNumber >= finish:
await subscription.unsubscribe()
raise newException(EthersError, "tx not mined before timeout")

View File

@ -1,9 +1,12 @@
import std/strutils
import pkg/stew/byteutils
import ../../basics
import ../../errors
import ../../provider
import ./conversions
export errors
{.push raises:[].}
type JsonRpcProviderError* = object of ProviderError

View File

@ -1,9 +1,12 @@
import std/tables
import std/sequtils
import std/strutils
import pkg/chronos
import pkg/questionable
import pkg/json_rpc/rpcclient
import pkg/serde
import ../../basics
import ../../errors
import ../../provider
include ../../nimshims/hashes
import ./rpccalls
@ -17,8 +20,7 @@ type
callbacks: Table[JsonNode, SubscriptionCallback]
methodHandlers: Table[string, MethodHandler]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].}
SubscriptionError* = object of EthersError
SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].}
{.push raises:[].}
@ -54,7 +56,7 @@ proc setMethodHandler(
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async, base.} =
{.async, base, raises: [CancelledError].} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
@ -75,14 +77,11 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
await subscriptions.unsubscribe(id)
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback =
id: JsonNode): ?SubscriptionCallback {. raises:[].} =
try:
if not id.isNil and id in subscriptions.callbacks:
subscriptions.callbacks[id].some
else:
SubscriptionCallback.none
except KeyError:
SubscriptionCallback.none
return subscriptions.callbacks[id].some
except: discard
# Web sockets
@ -96,17 +95,22 @@ proc new*(_: type JsonRpcSubscriptions,
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} =
let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id):
callback(id, arguments)
callback(id, success(arguments))
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) {.raises: [].} =
if blck =? Block.fromJson(arguments{"result"}):
onBlock(blck)
{.async, raises: [].} =
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} =
without arguments =? argumentsResult, error:
onBlock(failure(Block, error.toErr(SubscriptionError)))
return
let res = Block.fromJson(arguments{"result"}).mapFailure(SubscriptionError)
onBlock(res)
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return id
@ -116,9 +120,14 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
onLog: LogHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
if log =? Log.fromJson(arguments{"result"}):
onLog(log)
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError)
onLog(res)
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
return id
@ -150,7 +159,7 @@ proc new*(_: type JsonRpcSubscriptions,
let subscriptions = PollingSubscriptions(client: client)
proc resubscribe(id: JsonNode) {.async: (raises: [CancelledError]).} =
proc resubscribe(id: JsonNode): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
var newId: JsonNode
# Log filters are stored in logFilters, block filters are not persisted
@ -162,36 +171,49 @@ proc new*(_: type JsonRpcSubscriptions,
else:
newId = await subscriptions.client.eth_newBlockFilter()
subscriptions.subscriptionMapping[id] = newId
except CancelledError as error:
raise error
except CatchableError:
# there's nothing further we can do here
discard
except CancelledError as e:
raise e
except CatchableError as e:
return failure(void, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg))
proc getChanges(id: JsonNode): Future[JsonNode] {.async: (raises: [CancelledError]).} =
return success()
proc getChanges(id: JsonNode): Future[?!JsonNode] {.async: (raises: [CancelledError]).} =
if mappedId =? subscriptions.subscriptionMapping.?[id]:
try:
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
if changes.kind == JArray:
return changes
except JsonRpcError:
await resubscribe(id)
return success(changes)
except JsonRpcError as e:
if error =? (await resubscribe(id)).errorOption:
return failure(JsonNode, error)
# TODO: we could still miss some events between losing the subscription
# and resubscribing. We should probably adopt a strategy like ethers.js,
# whereby we keep track of the latest block number that we've seen
# filter changes for:
# https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977
except CancelledError as error:
raise error
except CatchableError:
# there's nothing we can do here
discard
return newJArray()
if not ("filter not found" in e.msg):
return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg))
except CancelledError as e:
raise e
except SubscriptionError as e:
return failure(JsonNode, e)
except CatchableError as e:
return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg))
return success(newJArray())
proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} =
for change in await getChanges(id):
if callback =? subscriptions.getCallback(id):
callback(id, change)
without callback =? subscriptions.getCallback(id):
return
without changes =? (await getChanges(id)), error:
callback(id, failure(JsonNode, error))
return
for change in changes:
callback(id, success(change))
proc poll {.async: (raises: []).} =
try:
@ -213,16 +235,23 @@ method close*(subscriptions: PollingSubscriptions) {.async.} =
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
{.async, raises:[CancelledError].} =
proc getBlock(hash: BlockHash) {.async.} =
proc getBlock(hash: BlockHash) {.async: (raises:[]).} =
try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(blck)
except CatchableError:
onBlock(success(blck))
except CancelledError:
discard
except CatchableError as e:
let error = e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg)
onBlock(failure(Block, error))
proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} =
without change =? changeResult, e:
onBlock(failure(Block, e.toErr(SubscriptionError)))
return
proc callback(id, change: JsonNode) =
if hash =? BlockHash.fromJson(change):
asyncSpawn getBlock(hash)
@ -237,9 +266,13 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
Future[JsonNode]
{.async.} =
proc callback(id, change: JsonNode) =
if log =? Log.fromJson(change):
onLog(log)
proc callback(id: JsonNode, argumentsResult: ?!JsonNode) =
without arguments =? argumentsResult, error:
onLog(failure(Log, error.toErr(SubscriptionError)))
return
let res = Log.fromJson(arguments).mapFailure(SubscriptionError)
onLog(res)
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback

View File

@ -1,17 +1,18 @@
import pkg/questionable
import ./basics
import ./errors
import ./provider
export basics
export errors
{.push raises: [].}
type
Signer* = ref object of RootObj
populateLock: AsyncLock
SignerError* = object of EthersError
template raiseSignerError(message: string, parent: ref ProviderError = nil) =
template raiseSignerError*(message: string, parent: ref ProviderError = nil) =
raise newException(SignerError, message, parent)
template convertError(body) =

View File

@ -9,11 +9,12 @@ import pkg/json_rpc/errors
type MockRpcHttpServer* = ref object
filters*: seq[string]
nextGetChangesReturnsError*: bool
srv: RpcHttpServer
proc new*(_: type MockRpcHttpServer): MockRpcHttpServer =
let srv = newRpcHttpServer(["127.0.0.1:0"])
MockRpcHttpServer(filters: @[], srv: srv)
MockRpcHttpServer(filters: @[], srv: srv, nextGetChangesReturnsError: false)
proc invalidateFilter*(server: MockRpcHttpServer, jsonId: JsonNode) =
server.filters.keepItIf it != jsonId.getStr
@ -30,6 +31,9 @@ proc start*(server: MockRpcHttpServer) =
return filterId
server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]:
if server.nextGetChangesReturnsError:
raise (ref ApplicationError)(code: -32000, msg: "unknown error")
if id notin server.filters:
raise (ref ApplicationError)(code: -32000, msg: "filter not found")

View File

@ -49,7 +49,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
let oldBlock = !await provider.getBlock(BlockTag.latest)
discard await provider.send("evm_mine")
var newBlock: Block
let blockHandler = proc(blck: Block) = newBlock = blck
let blockHandler = proc(blck: ?!Block) {.raises:[].}= newBlock = blck.value
let subscription = await provider.subscribe(blockHandler)
discard await provider.send("evm_mine")
check eventually newBlock.number.isSome
@ -98,7 +98,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
expect JsonRpcProviderError:
discard await provider.getBlock(BlockTag.latest)
expect JsonRpcProviderError:
discard await provider.subscribe(proc(_: Block) = discard)
discard await provider.subscribe(proc(_: ?!Block) = discard)
expect JsonRpcProviderError:
discard await provider.getSigner().sendTransaction(Transaction.example)

View File

@ -27,8 +27,8 @@ template subscriptionTests(subscriptions, client) =
test "subscribes to new blocks":
var latestBlock: Block
proc callback(blck: Block) =
latestBlock = blck
proc callback(blck: ?!Block) =
latestBlock = blck.value
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually latestBlock.number.isSome
@ -38,7 +38,8 @@ template subscriptionTests(subscriptions, client) =
test "stops listening to new blocks when unsubscribed":
var count = 0
proc callback(blck: Block) =
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
@ -51,7 +52,8 @@ template subscriptionTests(subscriptions, client) =
test "stops listening to new blocks when provider is closed":
var count = 0
proc callback(blck: Block) =
proc callback(blck: ?!Block) =
if blck.isOk:
inc count
discard await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
@ -97,7 +99,7 @@ suite "HTTP polling subscriptions":
subscriptionTests(subscriptions, client)
suite "HTTP polling subscriptions - filter not found":
suite "HTTP polling subscriptions - mock tests":
var subscriptions: PollingSubscriptions
var client: RpcHttpClient
@ -130,7 +132,7 @@ suite "HTTP polling subscriptions - filter not found":
test "filter not found error recreates log filter":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
check subscriptions.logFilters.len == 0
check subscriptions.subscriptionMapping.len == 0
@ -148,7 +150,7 @@ suite "HTTP polling subscriptions - filter not found":
test "recreated log filter can be still unsubscribed using the original id":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
@ -159,7 +161,7 @@ suite "HTTP polling subscriptions - filter not found":
check not subscriptions.subscriptionMapping.hasKey id
test "filter not found error recreates block filter":
let emptyHandler = proc(blck: Block) = discard
let emptyHandler = proc(blck: ?!Block) = discard
check subscriptions.subscriptionMapping.len == 0
let id = await subscriptions.subscribeBlocks(emptyHandler)
@ -170,7 +172,7 @@ suite "HTTP polling subscriptions - filter not found":
check eventually subscriptions.subscriptionMapping[id] != id
test "recreated block filter can be still unsubscribed using the original id":
let emptyHandler = proc(blck: Block) = discard
let emptyHandler = proc(blck: ?!Block) = discard
let id = await subscriptions.subscribeBlocks(emptyHandler)
mockServer.invalidateFilter(id)
check eventually subscriptions.subscriptionMapping[id] != id
@ -181,7 +183,7 @@ suite "HTTP polling subscriptions - filter not found":
test "polling continues with new filter after temporary error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
@ -191,3 +193,17 @@ suite "HTTP polling subscriptions - filter not found":
await startServer()
check eventually subscriptions.subscriptionMapping[id] != id
test "calls callback with failed result on error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
var failedResultReceived = false
proc handler(log: ?!Log) =
if log.isErr:
failedResultReceived = true
let id = await subscriptions.subscribeLogs(filter, handler)
await sleepAsync(50.milliseconds)
mockServer.nextGetChangesReturnsError = true
check eventually failedResultReceived

View File

@ -149,7 +149,10 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
test "receives events when subscribed":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) =
proc handleTransfer(transferRes: ?!Transfer) =
without transfer =? transferRes, error:
echo error.msg
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])
@ -171,7 +174,8 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]:
test "stops receiving events when unsubscribed":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) =
proc handleTransfer(transferRes: ?!Transfer) =
if transfer =? transferRes:
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])