diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4b17225..419b719 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,11 @@ name: CI -on: [push, pull_request, workflow_dispatch] +on: + push: + branches: + - main + pull_request: + workflow_dispatch: jobs: test: @@ -12,6 +17,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha }} - name: Install Nim uses: iffy/install-nim@v4 diff --git a/Readme.md b/Readme.md index cdf8de6..ae340f1 100644 --- a/Readme.md +++ b/Readme.md @@ -131,14 +131,22 @@ You can now subscribe to Transfer events by calling `subscribe` on the contract instance. ```nim -proc handleTransfer(transfer: Transfer) = - echo "received transfer: ", transfer +proc handleTransfer(transferResult: ?!Transfer) = + if transferResult.isOk: + echo "received transfer: ", transferResult.value + else: + echo "error during transfer: ", transferResult.error.msg let subscription = await token.subscribe(Transfer, handleTransfer) ``` When a Transfer event is emitted, the `handleTransfer` proc that you just -defined will be called. +defined will be called with a [Result](https://github.com/arnetheduck/nim-results) type +which contains the event value. + +In case there is some underlying error in the event subscription, the handler will +be called as well, but the Result will contain error instead, so do proper error +management in your handlers. When you're no longer interested in these events, you can unsubscribe: diff --git a/ethers/contract.nim b/ethers/contract.nim index 8857522..c47515b 100644 --- a/ethers/contract.nim +++ b/ethers/contract.nim @@ -3,6 +3,7 @@ import std/macros import std/sequtils import pkg/chronicles import pkg/chronos +import pkg/questionable import pkg/contractabi import ./basics import ./provider @@ -35,11 +36,10 @@ type gasLimit*: ?UInt256 CallOverrides* = ref object of TransactionOverrides blockTag*: ?BlockTag - ContractError* = object of EthersError Confirmable* = object response*: ?TransactionResponse convert*: ConvertCustomErrors - EventHandler*[E: Event] = proc(event: E) {.gcsafe, raises:[].} + EventHandler*[E: Event] = proc(event: ?!E) {.gcsafe, raises:[].} func new*(ContractType: type Contract, address: Address, @@ -292,9 +292,13 @@ proc subscribe*[E: Event](contract: Contract, let topic = topic($E, E.fieldTypes).toArray let filter = EventFilter(address: contract.address, topics: @[topic]) - proc logHandler(log: Log) {.raises: [].} = + proc logHandler(logResult: ?!Log) {.raises: [].} = + without log =? logResult, error: + handler(failure(E, error)) + return + if event =? E.decode(log.data, log.topics): - handler(event) + handler(success(event)) contract.provider.subscribe(filter, logHandler) diff --git a/ethers/errors.nim b/ethers/errors.nim index b45aee4..0151c93 100644 --- a/ethers/errors.nim +++ b/ethers/errors.nim @@ -1,7 +1,20 @@ import ./basics -type SolidityError* = object of EthersError +type + SolidityError* = object of EthersError + ContractError* = object of EthersError + SignerError* = object of EthersError + SubscriptionError* = object of EthersError + ProviderError* = object of EthersError + data*: ?seq[byte] {.push raises:[].} +proc toErr*[E1: ref CatchableError, E2: EthersError]( + e1: E1, + _: type E2, + msg: string = e1.msg): ref E2 = + + return newException(E2, msg, e1) + template errors*(types) {.pragma.} diff --git a/ethers/provider.nim b/ethers/provider.nim index 601c558..d6915f2 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -1,5 +1,6 @@ import pkg/chronicles import pkg/serde +import pkg/questionable import ./basics import ./transaction import ./blocktag @@ -8,13 +9,12 @@ import ./errors export basics export transaction export blocktag +export errors {.push raises: [].} type Provider* = ref object of RootObj - ProviderError* = object of EthersError - data*: ?seq[byte] EstimateGasError* = object of ProviderError transaction*: Transaction Subscription* = ref object of RootObj @@ -56,8 +56,8 @@ type effectiveGasPrice*: ?UInt256 status*: TransactionStatus transactionType* {.serialize("type"), deserialize("type").}: TransactionType - LogHandler* = proc(log: Log) {.gcsafe, raises:[].} - BlockHandler* = proc(blck: Block) {.gcsafe, raises:[].} + LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].} + BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].} Topic* = array[32, byte] Block* {.serialize.} = object number*: ?UInt256 @@ -227,7 +227,7 @@ proc confirm*( tx: TransactionResponse, confirmations = EthersDefaultConfirmations, timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt] - {.async: (raises: [CancelledError, ProviderError, EthersError]).} = + {.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} = ## Waits for a transaction to be mined and for the specified number of blocks ## to pass since it was mined (confirmations). The number of confirmations @@ -238,6 +238,12 @@ proc confirm*( assert confirmations > 0 var blockNumber: UInt256 + + ## We need initialized succesfull Result, because the first iteration of the `while` loop + ## bellow is triggered "manually" by calling `await updateBlockNumber` and not by block + ## subscription. If left uninitialized then the Result is in error state and error is raised. + ## This result is not used for block value, but for block subscription errors. + var blockSubscriptionResult: ?!Block = success(Block(number: UInt256.none, timestamp: 0.u256, hash: BlockHash.none)) let blockEvent = newAsyncEvent() proc updateBlockNumber {.async: (raises: []).} = @@ -250,7 +256,13 @@ proc confirm*( # there's nothing we can do here discard - proc onBlock(_: Block) = + proc onBlock(blckResult: ?!Block) = + blockSubscriptionResult = blckResult + + if blckResult.isErr: + blockEvent.fire() + return + # ignore block parameter; hardhat may call this with pending blocks asyncSpawn updateBlockNumber() @@ -264,6 +276,16 @@ proc confirm*( await blockEvent.wait() blockEvent.clear() + if blockSubscriptionResult.isErr: + let error = blockSubscriptionResult.error() + + if error of SubscriptionError: + raise (ref SubscriptionError)(error) + elif error of CancelledError: + raise (ref CancelledError)(error) + else: + raise error.toErr(ProviderError) + if blockNumber >= finish: await subscription.unsubscribe() raise newException(EthersError, "tx not mined before timeout") diff --git a/ethers/providers/jsonrpc/errors.nim b/ethers/providers/jsonrpc/errors.nim index c84d4c5..45bef72 100644 --- a/ethers/providers/jsonrpc/errors.nim +++ b/ethers/providers/jsonrpc/errors.nim @@ -1,9 +1,12 @@ import std/strutils import pkg/stew/byteutils import ../../basics +import ../../errors import ../../provider import ./conversions +export errors + {.push raises:[].} type JsonRpcProviderError* = object of ProviderError diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index 26ca5e5..64a8080 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -1,9 +1,12 @@ import std/tables import std/sequtils +import std/strutils import pkg/chronos +import pkg/questionable import pkg/json_rpc/rpcclient import pkg/serde import ../../basics +import ../../errors import ../../provider include ../../nimshims/hashes import ./rpccalls @@ -17,8 +20,7 @@ type callbacks: Table[JsonNode, SubscriptionCallback] methodHandlers: Table[string, MethodHandler] MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].} - SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].} - SubscriptionError* = object of EthersError + SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].} {.push raises:[].} @@ -54,7 +56,7 @@ proc setMethodHandler( method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, onBlock: BlockHandler): Future[JsonNode] - {.async, base.} = + {.async, base, raises: [CancelledError].} = raiseAssert "not implemented" method subscribeLogs*(subscriptions: JsonRpcSubscriptions, @@ -75,14 +77,11 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} = await subscriptions.unsubscribe(id) proc getCallback(subscriptions: JsonRpcSubscriptions, - id: JsonNode): ?SubscriptionCallback = + id: JsonNode): ?SubscriptionCallback {. raises:[].} = try: if not id.isNil and id in subscriptions.callbacks: - subscriptions.callbacks[id].some - else: - SubscriptionCallback.none - except KeyError: - SubscriptionCallback.none + return subscriptions.callbacks[id].some + except: discard # Web sockets @@ -96,17 +95,22 @@ proc new*(_: type JsonRpcSubscriptions, proc subscriptionHandler(arguments: JsonNode) {.raises:[].} = let id = arguments{"subscription"} or newJString("") if callback =? subscriptions.getCallback(id): - callback(id, arguments) + callback(id, success(arguments)) subscriptions.setMethodHandler("eth_subscription", subscriptionHandler) subscriptions method subscribeBlocks(subscriptions: WebSocketSubscriptions, onBlock: BlockHandler): Future[JsonNode] - {.async.} = - proc callback(id, arguments: JsonNode) {.raises: [].} = - if blck =? Block.fromJson(arguments{"result"}): - onBlock(blck) + {.async, raises: [].} = + proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} = + without arguments =? argumentsResult, error: + onBlock(failure(Block, error.toErr(SubscriptionError))) + return + + let res = Block.fromJson(arguments{"result"}).mapFailure(SubscriptionError) + onBlock(res) + let id = await subscriptions.client.eth_subscribe("newHeads") subscriptions.callbacks[id] = callback return id @@ -116,9 +120,14 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions, onLog: LogHandler): Future[JsonNode] {.async.} = - proc callback(id, arguments: JsonNode) = - if log =? Log.fromJson(arguments{"result"}): - onLog(log) + proc callback(id: JsonNode, argumentsResult: ?!JsonNode) = + without arguments =? argumentsResult, error: + onLog(failure(Log, error.toErr(SubscriptionError))) + return + + let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError) + onLog(res) + let id = await subscriptions.client.eth_subscribe("logs", filter) subscriptions.callbacks[id] = callback return id @@ -150,7 +159,7 @@ proc new*(_: type JsonRpcSubscriptions, let subscriptions = PollingSubscriptions(client: client) - proc resubscribe(id: JsonNode) {.async: (raises: [CancelledError]).} = + proc resubscribe(id: JsonNode): Future[?!void] {.async: (raises: [CancelledError]).} = try: var newId: JsonNode # Log filters are stored in logFilters, block filters are not persisted @@ -162,36 +171,49 @@ proc new*(_: type JsonRpcSubscriptions, else: newId = await subscriptions.client.eth_newBlockFilter() subscriptions.subscriptionMapping[id] = newId - except CancelledError as error: - raise error - except CatchableError: - # there's nothing further we can do here - discard + except CancelledError as e: + raise e + except CatchableError as e: + return failure(void, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) - proc getChanges(id: JsonNode): Future[JsonNode] {.async: (raises: [CancelledError]).} = + return success() + + proc getChanges(id: JsonNode): Future[?!JsonNode] {.async: (raises: [CancelledError]).} = if mappedId =? subscriptions.subscriptionMapping.?[id]: try: let changes = await subscriptions.client.eth_getFilterChanges(mappedId) if changes.kind == JArray: - return changes - except JsonRpcError: - await resubscribe(id) + return success(changes) + except JsonRpcError as e: + if error =? (await resubscribe(id)).errorOption: + return failure(JsonNode, error) + # TODO: we could still miss some events between losing the subscription # and resubscribing. We should probably adopt a strategy like ethers.js, # whereby we keep track of the latest block number that we've seen # filter changes for: # https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977 - except CancelledError as error: - raise error - except CatchableError: - # there's nothing we can do here - discard - return newJArray() + + if not ("filter not found" in e.msg): + return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) + except CancelledError as e: + raise e + except SubscriptionError as e: + return failure(JsonNode, e) + except CatchableError as e: + return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) + return success(newJArray()) proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} = - for change in await getChanges(id): - if callback =? subscriptions.getCallback(id): - callback(id, change) + without callback =? subscriptions.getCallback(id): + return + + without changes =? (await getChanges(id)), error: + callback(id, failure(JsonNode, error)) + return + + for change in changes: + callback(id, success(change)) proc poll {.async: (raises: []).} = try: @@ -213,16 +235,23 @@ method close*(subscriptions: PollingSubscriptions) {.async.} = method subscribeBlocks(subscriptions: PollingSubscriptions, onBlock: BlockHandler): Future[JsonNode] - {.async.} = + {.async, raises:[CancelledError].} = - proc getBlock(hash: BlockHash) {.async.} = + proc getBlock(hash: BlockHash) {.async: (raises:[]).} = try: if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)): - onBlock(blck) - except CatchableError: + onBlock(success(blck)) + except CancelledError: discard + except CatchableError as e: + let error = e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg) + onBlock(failure(Block, error)) + + proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} = + without change =? changeResult, e: + onBlock(failure(Block, e.toErr(SubscriptionError))) + return - proc callback(id, change: JsonNode) = if hash =? BlockHash.fromJson(change): asyncSpawn getBlock(hash) @@ -237,9 +266,13 @@ method subscribeLogs(subscriptions: PollingSubscriptions, Future[JsonNode] {.async.} = - proc callback(id, change: JsonNode) = - if log =? Log.fromJson(change): - onLog(log) + proc callback(id: JsonNode, argumentsResult: ?!JsonNode) = + without arguments =? argumentsResult, error: + onLog(failure(Log, error.toErr(SubscriptionError))) + return + + let res = Log.fromJson(arguments).mapFailure(SubscriptionError) + onLog(res) let id = await subscriptions.client.eth_newFilter(filter) subscriptions.callbacks[id] = callback diff --git a/ethers/signer.nim b/ethers/signer.nim index b85aadc..76ee245 100644 --- a/ethers/signer.nim +++ b/ethers/signer.nim @@ -1,17 +1,18 @@ import pkg/questionable import ./basics +import ./errors import ./provider export basics +export errors {.push raises: [].} type Signer* = ref object of RootObj populateLock: AsyncLock - SignerError* = object of EthersError -template raiseSignerError(message: string, parent: ref ProviderError = nil) = +template raiseSignerError*(message: string, parent: ref ProviderError = nil) = raise newException(SignerError, message, parent) template convertError(body) = diff --git a/testmodule/providers/jsonrpc/rpc_mock.nim b/testmodule/providers/jsonrpc/rpc_mock.nim index 41d5491..4abdb6e 100644 --- a/testmodule/providers/jsonrpc/rpc_mock.nim +++ b/testmodule/providers/jsonrpc/rpc_mock.nim @@ -9,11 +9,12 @@ import pkg/json_rpc/errors type MockRpcHttpServer* = ref object filters*: seq[string] + nextGetChangesReturnsError*: bool srv: RpcHttpServer proc new*(_: type MockRpcHttpServer): MockRpcHttpServer = let srv = newRpcHttpServer(["127.0.0.1:0"]) - MockRpcHttpServer(filters: @[], srv: srv) + MockRpcHttpServer(filters: @[], srv: srv, nextGetChangesReturnsError: false) proc invalidateFilter*(server: MockRpcHttpServer, jsonId: JsonNode) = server.filters.keepItIf it != jsonId.getStr @@ -30,6 +31,9 @@ proc start*(server: MockRpcHttpServer) = return filterId server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]: + if server.nextGetChangesReturnsError: + raise (ref ApplicationError)(code: -32000, msg: "unknown error") + if id notin server.filters: raise (ref ApplicationError)(code: -32000, msg: "filter not found") diff --git a/testmodule/providers/jsonrpc/testJsonRpcProvider.nim b/testmodule/providers/jsonrpc/testJsonRpcProvider.nim index 90a28ff..5f09686 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcProvider.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcProvider.nim @@ -49,7 +49,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: let oldBlock = !await provider.getBlock(BlockTag.latest) discard await provider.send("evm_mine") var newBlock: Block - let blockHandler = proc(blck: Block) = newBlock = blck + let blockHandler = proc(blck: ?!Block) {.raises:[].}= newBlock = blck.value let subscription = await provider.subscribe(blockHandler) discard await provider.send("evm_mine") check eventually newBlock.number.isSome @@ -98,7 +98,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: expect JsonRpcProviderError: discard await provider.getBlock(BlockTag.latest) expect JsonRpcProviderError: - discard await provider.subscribe(proc(_: Block) = discard) + discard await provider.subscribe(proc(_: ?!Block) = discard) expect JsonRpcProviderError: discard await provider.getSigner().sendTransaction(Transaction.example) diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index d20a152..8f5867a 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -27,8 +27,8 @@ template subscriptionTests(subscriptions, client) = test "subscribes to new blocks": var latestBlock: Block - proc callback(blck: Block) = - latestBlock = blck + proc callback(blck: ?!Block) = + latestBlock = blck.value let subscription = await subscriptions.subscribeBlocks(callback) discard await client.call("evm_mine", newJArray()) check eventually latestBlock.number.isSome @@ -38,8 +38,9 @@ template subscriptionTests(subscriptions, client) = test "stops listening to new blocks when unsubscribed": var count = 0 - proc callback(blck: Block) = - inc count + proc callback(blck: ?!Block) = + if blck.isOk: + inc count let subscription = await subscriptions.subscribeBlocks(callback) discard await client.call("evm_mine", newJArray()) check eventually count > 0 @@ -51,8 +52,9 @@ template subscriptionTests(subscriptions, client) = test "stops listening to new blocks when provider is closed": var count = 0 - proc callback(blck: Block) = - inc count + proc callback(blck: ?!Block) = + if blck.isOk: + inc count discard await subscriptions.subscribeBlocks(callback) discard await client.call("evm_mine", newJArray()) check eventually count > 0 @@ -97,7 +99,7 @@ suite "HTTP polling subscriptions": subscriptionTests(subscriptions, client) -suite "HTTP polling subscriptions - filter not found": +suite "HTTP polling subscriptions - mock tests": var subscriptions: PollingSubscriptions var client: RpcHttpClient @@ -130,7 +132,7 @@ suite "HTTP polling subscriptions - filter not found": test "filter not found error recreates log filter": let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: Log) = discard + let emptyHandler = proc(log: ?!Log) = discard check subscriptions.logFilters.len == 0 check subscriptions.subscriptionMapping.len == 0 @@ -148,7 +150,7 @@ suite "HTTP polling subscriptions - filter not found": test "recreated log filter can be still unsubscribed using the original id": let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: Log) = discard + let emptyHandler = proc(log: ?!Log) = discard let id = await subscriptions.subscribeLogs(filter, emptyHandler) mockServer.invalidateFilter(id) check eventually subscriptions.subscriptionMapping[id] != id @@ -159,7 +161,7 @@ suite "HTTP polling subscriptions - filter not found": check not subscriptions.subscriptionMapping.hasKey id test "filter not found error recreates block filter": - let emptyHandler = proc(blck: Block) = discard + let emptyHandler = proc(blck: ?!Block) = discard check subscriptions.subscriptionMapping.len == 0 let id = await subscriptions.subscribeBlocks(emptyHandler) @@ -170,7 +172,7 @@ suite "HTTP polling subscriptions - filter not found": check eventually subscriptions.subscriptionMapping[id] != id test "recreated block filter can be still unsubscribed using the original id": - let emptyHandler = proc(blck: Block) = discard + let emptyHandler = proc(blck: ?!Block) = discard let id = await subscriptions.subscribeBlocks(emptyHandler) mockServer.invalidateFilter(id) check eventually subscriptions.subscriptionMapping[id] != id @@ -181,7 +183,7 @@ suite "HTTP polling subscriptions - filter not found": test "polling continues with new filter after temporary error": let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: Log) = discard + let emptyHandler = proc(log: ?!Log) = discard let id = await subscriptions.subscribeLogs(filter, emptyHandler) @@ -191,3 +193,17 @@ suite "HTTP polling subscriptions - filter not found": await startServer() check eventually subscriptions.subscriptionMapping[id] != id + + test "calls callback with failed result on error": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + var failedResultReceived = false + + proc handler(log: ?!Log) = + if log.isErr: + failedResultReceived = true + + let id = await subscriptions.subscribeLogs(filter, handler) + + await sleepAsync(50.milliseconds) + mockServer.nextGetChangesReturnsError = true + check eventually failedResultReceived diff --git a/testmodule/testContracts.nim b/testmodule/testContracts.nim index 4eac40e..53e2547 100644 --- a/testmodule/testContracts.nim +++ b/testmodule/testContracts.nim @@ -149,7 +149,10 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: test "receives events when subscribed": var transfers: seq[Transfer] - proc handleTransfer(transfer: Transfer) = + proc handleTransfer(transferRes: ?!Transfer) = + without transfer =? transferRes, error: + echo error.msg + transfers.add(transfer) let signer0 = provider.getSigner(accounts[0]) @@ -171,8 +174,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: test "stops receiving events when unsubscribed": var transfers: seq[Transfer] - proc handleTransfer(transfer: Transfer) = - transfers.add(transfer) + proc handleTransfer(transferRes: ?!Transfer) = + if transfer =? transferRes: + transfers.add(transfer) let signer0 = provider.getSigner(accounts[0])