From 5ddcddbd88124a5a58e5e9c7e721d72ae40bf776 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 3 Sep 2025 14:53:14 +0200 Subject: [PATCH 01/13] chore: fix warning --- testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index 9417cfa..fc89736 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -211,7 +211,7 @@ suite "HTTP polling subscriptions - mock tests": if log.isErr: failedResultReceived = true - let id = await subscriptions.subscribeLogs(filter, handler) + discard await subscriptions.subscribeLogs(filter, handler) await sleepAsync(50.milliseconds) mockServer.nextGetChangesReturnsError = true From 26b19140a1a36f28268ae81d31d010e5b1845123 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 3 Sep 2025 15:29:41 +0200 Subject: [PATCH 02/13] chore: formatting --- ethers/providers/jsonrpc.nim | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index 65420a1..73ba2a4 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -53,7 +53,8 @@ proc new*( _: type JsonRpcProvider, url=defaultUrl, pollingInterval=defaultPollingInterval, - maxPriorityFeePerGas=defaultMaxPriorityFeePerGas): JsonRpcProvider {.raises: [JsonRpcProviderError].} = + maxPriorityFeePerGas=defaultMaxPriorityFeePerGas +): JsonRpcProvider {.raises: [JsonRpcProviderError].} = var initialized: Future[void] var client: RpcClient @@ -71,8 +72,10 @@ proc new*( let http = newRpcHttpClient(getHeaders = jsonHeaders) await http.connect(url) client = http - subscriptions = JsonRpcSubscriptions.new(http, - pollingInterval = pollingInterval) + subscriptions = JsonRpcSubscriptions.new( + http, + pollingInterval = pollingInterval + ) subscriptions.start() proc awaitClient(): Future[RpcClient] {. @@ -90,7 +93,11 @@ proc new*( return subscriptions initialized = initialize() - return JsonRpcProvider(client: awaitClient(), subscriptions: awaitSubscriptions(), maxPriorityFeePerGas: maxPriorityFeePerGas) + return JsonRpcProvider( + client: awaitClient(), + subscriptions: awaitSubscriptions(), + maxPriorityFeePerGas: maxPriorityFeePerGas + ) proc callImpl( client: RpcClient, call: string, args: JsonNode @@ -98,8 +105,9 @@ proc callImpl( try: let response = await client.call(call, %args) without json =? JsonNode.fromJson(response.string), error: - raiseJsonRpcProviderError "Failed to parse response '" & response.string & "': " & - error.msg + raiseJsonRpcProviderError( + "Failed to parse response '" & response.string & "': " & error.msg + ) return json except CancelledError as error: raise error From 99b600f554892138758e2d13930a2b4abd25be8e Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 3 Sep 2025 15:45:49 +0200 Subject: [PATCH 03/13] chore: cleanup config.nims files --- config.nims | 8 -------- testmodule/config.nims | 7 +------ 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/config.nims b/config.nims index 6181efc..bda1e13 100644 --- a/config.nims +++ b/config.nims @@ -1,10 +1,2 @@ --styleCheck:usages --styleCheck:error - -# begin Nimble config (version 1) -when fileExists("nimble.paths"): - include "nimble.paths" -# end Nimble config - -when (NimMajor, NimMinor) >= (2, 0): - --mm:refc diff --git a/testmodule/config.nims b/testmodule/config.nims index ba64806..b264091 100644 --- a/testmodule/config.nims +++ b/testmodule/config.nims @@ -1,7 +1,2 @@ -switch("path", "..") -when (NimMajor, NimMinor) >= (1, 4): - switch("hint", "XCannotRaiseY:off") -when (NimMajor, NimMinor, NimPatch) >= (1, 6, 11): - switch("warning", "BareExcept:off") - +--path:".." --define:"chronicles_enabled:off" From 119c0dff9c087f01fe6f38dfc2573540b81e67f1 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 3 Sep 2025 15:45:58 +0200 Subject: [PATCH 04/13] chore: fix warning --- ethers/providers/jsonrpc.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index 73ba2a4..5a81381 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -54,7 +54,7 @@ proc new*( url=defaultUrl, pollingInterval=defaultPollingInterval, maxPriorityFeePerGas=defaultMaxPriorityFeePerGas -): JsonRpcProvider {.raises: [JsonRpcProviderError].} = +): JsonRpcProvider {.raises: [].} = var initialized: Future[void] var client: RpcClient From 3b0f94f2b52396161ddbadd47e6e3def94ee5b28 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 3 Sep 2025 16:12:27 +0200 Subject: [PATCH 05/13] refactor(subscriptions): remove unnecessary json-rpc method table --- ethers/providers/jsonrpc/subscriptions.nim | 31 ++++++---------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index 95c094e..096fcb9 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -18,7 +18,7 @@ type JsonRpcSubscriptions* = ref object of RootObj client: RpcClient callbacks: Table[JsonNode, SubscriptionCallback] - methodHandlers: Table[string, MethodHandler] + subscriptionHandler: MethodHandler # Used by both PollingSubscriptions and WebsocketSubscriptions to store # subscription filters so the subscriptions can be recreated. With # PollingSubscriptions, the RPC node might prune/forget about them, and with @@ -43,28 +43,13 @@ template `or`(a: JsonNode, b: typed): JsonNode = func start*(subscriptions: JsonRpcSubscriptions) = subscriptions.client.onProcessMessage = - proc(client: RpcClient, - line: string): Result[bool, string] {.gcsafe, raises: [].} = + proc(client: RpcClient, line: string): Result[bool, string] = if json =? JsonNode.fromJson(line): - if "method" in json: - let methodName = json{"method"}.getStr() - if methodName in subscriptions.methodHandlers: - let handler = subscriptions.methodHandlers.getOrDefault(methodName) - if not handler.isNil: - handler(json{"params"} or newJArray()) - # false = do not continue processing message using json_rpc's - # default processing handler - return ok false - - # true = continue processing message using json_rpc's default message handler - return ok true - -proc setMethodHandler( - subscriptions: JsonRpcSubscriptions, - `method`: string, - handler: MethodHandler -) = - subscriptions.methodHandlers[`method`] = handler + if "method" in json and json{"method"}.getStr() == "eth_subscription": + if handler =? subscriptions.subscriptionHandler: + handler(json{"params"} or newJArray()) + return ok false # do not process using json-rpc default handler + return ok true # continue processing using json-rpc default handler method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, onBlock: BlockHandler): @@ -155,7 +140,7 @@ proc new*(_: type JsonRpcSubscriptions, let id = arguments{"subscription"} or newJString("") if callback =? subscriptions.getCallback(id): callback(id, success(arguments)) - subscriptions.setMethodHandler("eth_subscription", subscriptionHandler) + subscriptions.subscriptionHandler = subscriptionHandler if resubscribeInterval > 0: if resubscribeInterval >= 300: From b5377d5874c9e37d5eb868e15020a20842d08346 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 9 Sep 2025 10:57:40 +0200 Subject: [PATCH 06/13] chore!(provider): update Log and Block types --- ethers/provider.nim | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethers/provider.nim b/ethers/provider.nim index 55cc07c..844c9c4 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -31,6 +31,7 @@ type data*: seq[byte] logIndex*: UInt256 removed*: bool + address*: Address topics*: seq[Topic] TransactionHash* = array[32, byte] BlockHash* = array[32, byte] @@ -64,6 +65,7 @@ type timestamp*: UInt256 hash*: ?BlockHash baseFeePerGas* : ?UInt256 + logsBloom*: ?StUint[2048] PastTransaction* {.serialize.} = object blockHash*: BlockHash blockNumber*: UInt256 From c2d2909936e7da9e66ae69662c57c1681ffc97ff Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 9 Sep 2025 10:58:07 +0200 Subject: [PATCH 07/13] chore(provider): add BlockNumber type alias --- ethers/provider.nim | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ethers/provider.nim b/ethers/provider.nim index 844c9c4..e1da475 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -27,7 +27,7 @@ type FilterByBlockHash* {.serialize.} = ref object of EventFilter blockHash*: BlockHash Log* {.serialize.} = object - blockNumber*: UInt256 + blockNumber*: BlockNumber data*: seq[byte] logIndex*: UInt256 removed*: bool @@ -52,7 +52,7 @@ type blockHash*: ?BlockHash transactionHash*: TransactionHash logs*: seq[Log] - blockNumber*: ?UInt256 + blockNumber*: ?BlockNumber cumulativeGasUsed*: UInt256 effectiveGasPrice*: ?UInt256 status*: TransactionStatus @@ -61,14 +61,15 @@ type BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].} Topic* = array[32, byte] Block* {.serialize.} = object - number*: ?UInt256 + number*: ?BlockNumber timestamp*: UInt256 hash*: ?BlockHash baseFeePerGas* : ?UInt256 logsBloom*: ?StUint[2048] + BlockNumber* = UInt256 PastTransaction* {.serialize.} = object blockHash*: BlockHash - blockNumber*: UInt256 + blockNumber*: BlockNumber sender* {.serialize("from"), deserialize("from").}: Address gas*: UInt256 gasPrice*: UInt256 From 017245826f51c1182fe321f65ee754ba0c10f97b Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 9 Sep 2025 16:10:26 +0200 Subject: [PATCH 08/13] refactor(subscriptions): new implementation of subscriptions --- ethers/subscriptions.nim | 127 +++++++++++++++++++++++ ethers/subscriptions/blocksubscriber.nim | 63 +++++++++++ ethers/subscriptions/logsbloom.nim | 14 +++ 3 files changed, 204 insertions(+) create mode 100644 ethers/subscriptions.nim create mode 100644 ethers/subscriptions/blocksubscriber.nim create mode 100644 ethers/subscriptions/logsbloom.nim diff --git a/ethers/subscriptions.nim b/ethers/subscriptions.nim new file mode 100644 index 0000000..b71001e --- /dev/null +++ b/ethers/subscriptions.nim @@ -0,0 +1,127 @@ +import std/tables +import std/sequtils +import ./basics +import ./provider +import ./subscriptions/blocksubscriber +import ./subscriptions/logsbloom + +type + Subscriptions* = ref object + provider: Provider + blockSubscriber: BlockSubscriber + blockSubscriptions: Table[SubscriptionId, BlockHandler] + logSubscriptions: Table[SubscriptionId, (EventFilter, LogHandler)] + nextSubscriptionId: int + LocalSubscription* = ref object of Subscription + subscriptions: Subscriptions + id: SubscriptionId + SubscriptionId = int + +func len*(subscriptions: Subscriptions): int = + subscriptions.blockSubscriptions.len + subscriptions.logSubscriptions.len + +proc subscribe*( + subscriptions: Subscriptions, + onBlock: BlockHandler +): Future[Subscription] {.async:(raises:[ProviderError, CancelledError]).} = + let id = subscriptions.nextSubscriptionId + inc subscriptions.nextSubscriptionId + subscriptions.blockSubscriptions[id] = onBlock + await subscriptions.blockSubscriber.start() + LocalSubscription(subscriptions: subscriptions, id: id) + +proc subscribe*( + subscriptions: Subscriptions, + filter: EventFilter, + onLog: LogHandler +): Future[Subscription] {.async:(raises:[ProviderError, CancelledError]).} = + let id = subscriptions.nextSubscriptionId + inc subscriptions.nextSubscriptionId + subscriptions.logSubscriptions[id] = (filter, onLog) + await subscriptions.blockSubscriber.start() + LocalSubscription(subscriptions: subscriptions, id: id) + +method unsubscribe*( + subscription: LocalSubscription +) {.async:(raises:[ProviderError, CancelledError]).} = + let subscriptions = subscription.subscriptions + let id = subscription.id + subscriptions.logSubscriptions.del(id) + subscriptions.blockSubscriptions.del(id) + if subscriptions.len == 0: + await subscriptions.blockSubscriber.stop() + +proc getLogs( + subscriptions: Subscriptions, + filter: EventFilter, + blockTag: BlockTag +): Future[seq[Log]] {.async:(raises:[ProviderError, CancelledError]).} = + let logFilter = Filter() + logFilter.address = filter.address + logFilter.topics = filter.topics + logFilter.fromBlock = blockTag + logFilter.toBlock = blockTag + await subscriptions.provider.getLogs(logFilter) + +proc getLogs( + subscriptions: Subscriptions, + blck: Block +): Future[Table[SubscriptionId, seq[Log]]] {. + async:(raises:[ProviderError, CancelledError]) +.} = + without blockNumber =? blck.number: + return + let blockTag = BlockTag.init(blockNumber) + let ids = toSeq(subscriptions.logSubscriptions.keys) + for id in ids: + without (filter, _) =? subscriptions.logSubscriptions.?[id]: + continue + if filter notin blck: + continue + result[id] = await subscriptions.getLogs(filter, blockTag) + +proc processBlock( + subscriptions: Subscriptions, + blockNumber: BlockNumber +): Future[bool] {.async:(raises:[CancelledError]).} = + try: + let blockTag = BlockTag.init(blockNumber) + without blck =? await subscriptions.provider.getBlock(blockTag): + return false + if blck.logsBloom.isNone: + return false + let logs = await subscriptions.getLogs(blck) + for handler in subscriptions.blockSubscriptions.values: + handler(success blck) + for (id, logs) in logs.pairs: + if (_, handler) =? subscriptions.logSubscriptions.?[id]: + for log in logs: + handler(success log) + return true + except ProviderError: + return false + +func new*( + _: type Subscriptions, + provider: Provider, + pollingInterval: Duration +): Subscriptions = + let subscriptions = Subscriptions() + proc processBlock( + blockNumber: BlockNumber + ): Future[bool] {.async:(raises:[CancelledError]).} = + await subscriptions.processBlock(blockNumber) + let blockSubscriber = BlockSubscriber.new( + provider, + processBlock, + pollingInterval + ) + subscriptions.provider = provider + subscriptions.blockSubscriber = blockSubscriber + subscriptions + +proc close*(subscriptions: Subscriptions) {.async:(raises:[]).} = + await subscriptions.blockSubscriber.stop() + +proc update*(subscriptions: Subscriptions) = + subscriptions.blockSubscriber.update() diff --git a/ethers/subscriptions/blocksubscriber.nim b/ethers/subscriptions/blocksubscriber.nim new file mode 100644 index 0000000..305cd93 --- /dev/null +++ b/ethers/subscriptions/blocksubscriber.nim @@ -0,0 +1,63 @@ +import ../basics +import ../provider + +type + BlockSubscriber* = ref object + provider: Provider + processor: ProcessBlock + pollingInterval: Duration + lastSeen: BlockNumber + lastProcessed: BlockNumber + wake: AsyncEvent + looping: Future[void].Raising([]) + ProcessBlock* = + proc(number: BlockNumber): Future[bool] {.async:(raises:[CancelledError]).} + +func new*( + _: type BlockSubscriber, + provider: Provider, + processor: ProcessBlock, + pollingInterval: Duration +): BlockSubscriber = + BlockSubscriber( + provider: provider, + processor: processor, + pollingInterval: pollingInterval + ) + +proc sleep(subscriber: BlockSubscriber) {.async:(raises:[CancelledError]).} = + discard await subscriber.wake.wait().withTimeout(subscriber.pollingInterval) + subscriber.wake.clear() + +proc loop(subscriber: BlockSubscriber) {.async:(raises:[]).} = + try: + while true: + try: + await subscriber.sleep() + subscriber.lastSeen = await subscriber.provider.getBlockNumber() + for number in (subscriber.lastProcessed + 1)..subscriber.lastSeen: + if await subscriber.processor(number): + subscriber.lastProcessed = number + else: + break + except ProviderError: + discard + except CancelledError: + discard + +proc start*( + subscriber: BlockSubscriber +) {.async:(raises:[ProviderError, CancelledError]).} = + if subscriber.looping.isNil: + subscriber.lastSeen = await subscriber.provider.getBlockNumber() + subscriber.lastProcessed = subscriber.lastSeen + subscriber.wake = newAsyncEvent() + subscriber.looping = subscriber.loop() + +proc stop*(subscriber: BlockSubscriber) {.async:(raises:[]).} = + if looping =? subscriber.looping: + subscriber.looping = nil + await looping.cancelAndWait() + +proc update*(subscriber: BlockSubscriber) = + subscriber.wake.fire() diff --git a/ethers/subscriptions/logsbloom.nim b/ethers/subscriptions/logsbloom.nim new file mode 100644 index 0000000..7ecb32b --- /dev/null +++ b/ethers/subscriptions/logsbloom.nim @@ -0,0 +1,14 @@ +import pkg/eth/bloom +import ../basics +import ../provider + +func contains*(blck: Block, filter: EventFilter): bool = + without logsBloom =? blck.logsBloom: + return false + let bloomFilter = BloomFilter(value: logsBloom) + if filter.address.toArray notin bloomFilter: + return false + for topic in filter.topics: + if topic notin bloomFilter: + return false + return true From ff667cb8f0938befff9dbc71a7d1f47710a1b0de Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 9 Sep 2025 17:25:17 +0200 Subject: [PATCH 09/13] refactor!(subscriptions): replace old implementation --- Readme.md | 9 +- ethers/errors.nim | 1 - ethers/provider.nim | 2 +- ethers/providers/jsonrpc.nim | 185 +++------ ethers/providers/jsonrpc/subscriptions.nim | 364 ------------------ testmodule/providers/jsonrpc/rpc_mock.nim | 56 --- .../providers/jsonrpc/testJsonRpcProvider.nim | 21 +- .../providers/jsonrpc/testJsonRpcSigner.nim | 6 +- .../jsonrpc/testJsonRpcSubscriptions.nim | 254 +++--------- .../jsonrpc/testWsResubscription.nim | 56 --- testmodule/providers/testJsonRpc.nim | 1 - testmodule/testContracts.nim | 2 +- testmodule/testCustomErrors.nim | 4 +- testmodule/testEnums.nim | 4 +- testmodule/testErc20.nim | 2 +- testmodule/testGasEstimation.nim | 4 +- testmodule/testReturns.nim | 4 +- testmodule/testTesting.nim | 4 +- testmodule/testWallet.nim | 7 +- 19 files changed, 131 insertions(+), 855 deletions(-) delete mode 100644 ethers/providers/jsonrpc/subscriptions.nim delete mode 100644 testmodule/providers/jsonrpc/rpc_mock.nim delete mode 100644 testmodule/providers/jsonrpc/testWsResubscription.nim diff --git a/Readme.md b/Readme.md index 645735d..b1ff00e 100644 --- a/Readme.md +++ b/Readme.md @@ -33,7 +33,7 @@ JSON-RPC provider is supported: import ethers import chronos -let provider = JsonRpcProvider.new("ws://localhost:8545") +let provider = await JsonRpcProvider.connect("ws://localhost:8545") let accounts = await provider.listAccounts() ``` @@ -204,13 +204,6 @@ This library ships with some optional modules that provides convenience utilitie - `ethers/erc20` module provides you with ERC20 token implementation and its events -Hardhat websockets workaround ---------- - -If you're working with Hardhat, you might encounter an issue where [websocket subscriptions stop working after 5 minutes](https://github.com/NomicFoundation/hardhat/issues/2053). - -This library provides a workaround using the compile time `ws_resubscribe` symbol. When this symbol is defined and set to a value greater than 0, websocket subscriptions will automatically resubscribe after the amount of time (in seconds) specified. The recommended value is 240 seconds (4 minutes), eg `--define:ws_resubscribe=240`. - Contribution ------------ diff --git a/ethers/errors.nim b/ethers/errors.nim index 5fece97..89416b8 100644 --- a/ethers/errors.nim +++ b/ethers/errors.nim @@ -4,7 +4,6 @@ type SolidityError* = object of EthersError ContractError* = object of EthersError SignerError* = object of EthersError - SubscriptionError* = object of EthersError ProviderError* = object of EthersError data*: ?seq[byte] diff --git a/ethers/provider.nim b/ethers/provider.nim index e1da475..9ceb217 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -225,7 +225,7 @@ proc confirm*( tx: TransactionResponse, confirmations = EthersDefaultConfirmations, timeout = EthersReceiptTimeoutBlks): Future[TransactionReceipt] - {.async: (raises: [CancelledError, ProviderError, SubscriptionError, EthersError]).} = + {.async: (raises: [CancelledError, ProviderError, EthersError]).} = ## Waits for a transaction to be mined and for the specified number of blocks ## to pass since it was mined (confirmations). The number of confirmations diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index 5a81381..004c931 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -7,40 +7,26 @@ import pkg/json_rpc/errors import pkg/serde import ../basics import ../provider +import ../subscriptions import ../signer import ./jsonrpc/rpccalls import ./jsonrpc/conversions -import ./jsonrpc/subscriptions import ./jsonrpc/errors export basics export provider export chronicles export errors.JsonRpcProviderError -export subscriptions {.push raises: [].} logScope: topics = "ethers jsonrpc" -type - JsonRpcProvider* = ref object of Provider - client: Future[RpcClient] - subscriptions: Future[JsonRpcSubscriptions] - maxPriorityFeePerGas: UInt256 - - JsonRpcSubscription* = ref object of Subscription - subscriptions: JsonRpcSubscriptions - id: JsonNode - - # Signer - JsonRpcSigner* = ref object of Signer - provider: JsonRpcProvider - address: ?Address - JsonRpcSignerError* = object of SignerError - -# Provider +type JsonRpcProvider* = ref object of Provider + client: RpcClient + subscriptions: Subscriptions + maxPriorityFeePerGas: UInt256 const defaultUrl = "http://localhost:8545" const defaultPollingInterval = 4.seconds @@ -49,55 +35,26 @@ const defaultMaxPriorityFeePerGas = 1_000_000_000.u256 proc jsonHeaders: seq[(string, string)] = @[("Content-Type", "application/json")] -proc new*( +proc connect*( _: type JsonRpcProvider, url=defaultUrl, pollingInterval=defaultPollingInterval, maxPriorityFeePerGas=defaultMaxPriorityFeePerGas -): JsonRpcProvider {.raises: [].} = - - var initialized: Future[void] - var client: RpcClient - var subscriptions: JsonRpcSubscriptions - - proc initialize() {.async: (raises: [JsonRpcProviderError, CancelledError]).} = - convertError: - case parseUri(url).scheme - of "ws", "wss": - let websocket = newRpcWebSocketClient(getHeaders = jsonHeaders) - await websocket.connect(url) - client = websocket - subscriptions = JsonRpcSubscriptions.new(websocket) - else: - let http = newRpcHttpClient(getHeaders = jsonHeaders) - await http.connect(url) - client = http - subscriptions = JsonRpcSubscriptions.new( - http, - pollingInterval = pollingInterval - ) - subscriptions.start() - - proc awaitClient(): Future[RpcClient] {. - async: (raises: [JsonRpcProviderError, CancelledError]) - .} = - convertError: - await initialized - return client - - proc awaitSubscriptions(): Future[JsonRpcSubscriptions] {. - async: (raises: [JsonRpcProviderError, CancelledError]) - .} = - convertError: - await initialized - return subscriptions - - initialized = initialize() - return JsonRpcProvider( - client: awaitClient(), - subscriptions: awaitSubscriptions(), - maxPriorityFeePerGas: maxPriorityFeePerGas - ) +): Future[JsonRpcProvider] {.async:(raises: [JsonRpcProviderError, CancelledError]).} = + convertError: + let provider = JsonRpcProvider(maxPriorityFeePerGas: maxPriorityFeePerGas) + case parseUri(url).scheme + of "ws", "wss": + let websocket = newRpcWebSocketClient(getHeaders = jsonHeaders) + await websocket.connect(url) + provider.client = websocket + provider.subscriptions = Subscriptions.new(provider, pollingInterval) + else: + let http = newRpcHttpClient(getHeaders = jsonHeaders) + await http.connect(url) + provider.client = http + provider.subscriptions = Subscriptions.new(provider, pollingInterval) + return provider proc callImpl( client: RpcClient, call: string, args: JsonNode @@ -118,57 +75,44 @@ proc send*( provider: JsonRpcProvider, call: string, arguments: seq[JsonNode] = @[] ): Future[JsonNode] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.callImpl(call, %arguments) + return await provider.client.callImpl(call, %arguments) proc listAccounts*( provider: JsonRpcProvider ): Future[seq[Address]] {.async: (raises: [JsonRpcProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_accounts() - -proc getSigner*(provider: JsonRpcProvider): JsonRpcSigner = - JsonRpcSigner(provider: provider) - -proc getSigner*(provider: JsonRpcProvider, address: Address): JsonRpcSigner = - JsonRpcSigner(provider: provider, address: some address) + return await provider.client.eth_accounts() method getBlockNumber*( provider: JsonRpcProvider ): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_blockNumber() + return await provider.client.eth_blockNumber() method getBlock*( provider: JsonRpcProvider, tag: BlockTag ): Future[?Block] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_getBlockByNumber(tag, false) + return await provider.client.eth_getBlockByNumber(tag, false) method call*( provider: JsonRpcProvider, tx: Transaction, blockTag = BlockTag.latest ): Future[seq[byte]] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_call(tx, blockTag) + return await provider.client.eth_call(tx, blockTag) method getGasPrice*( provider: JsonRpcProvider ): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_gasPrice() + return await provider.client.eth_gasPrice() method getMaxPriorityFeePerGas*( provider: JsonRpcProvider ): Future[UInt256] {.async: (raises: [CancelledError]).} = try: convertError: - let client = await provider.client - return await client.eth_maxPriorityFeePerGas() + return await provider.client.eth_maxPriorityFeePerGas() except JsonRpcProviderError: # If the provider does not provide the implementation # let's just remove the manual value @@ -178,35 +122,31 @@ method getTransactionCount*( provider: JsonRpcProvider, address: Address, blockTag = BlockTag.latest ): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_getTransactionCount(address, blockTag) + return await provider.client.eth_getTransactionCount(address, blockTag) method getTransaction*( provider: JsonRpcProvider, txHash: TransactionHash ): Future[?PastTransaction] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_getTransactionByHash(txHash) + return await provider.client.eth_getTransactionByHash(txHash) method getTransactionReceipt*( provider: JsonRpcProvider, txHash: TransactionHash ): Future[?TransactionReceipt] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - return await client.eth_getTransactionReceipt(txHash) + return await provider.client.eth_getTransactionReceipt(txHash) method getLogs*( provider: JsonRpcProvider, filter: EventFilter ): Future[seq[Log]] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client let logsJson = if filter of Filter: - await client.eth_getLogs(Filter(filter)) + await provider.client.eth_getLogs(Filter(filter)) elif filter of FilterByBlockHash: - await client.eth_getLogs(FilterByBlockHash(filter)) + await provider.client.eth_getLogs(FilterByBlockHash(filter)) else: - await client.eth_getLogs(filter) + await provider.client.eth_getLogs(filter) var logs: seq[Log] = @[] for logJson in logsJson.getElems: @@ -222,8 +162,7 @@ method estimateGas*( ): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = try: convertError: - let client = await provider.client - return await client.eth_estimateGas(transaction, blockTag) + return await provider.client.eth_estimateGas(transaction, blockTag) except ProviderError as error: raise (ref EstimateGasError)( msg: "Estimate gas failed: " & error.msg, @@ -236,47 +175,29 @@ method getChainId*( provider: JsonRpcProvider ): Future[UInt256] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client try: - return await client.eth_chainId() + return await provider.client.eth_chainId() except CancelledError as error: raise error except CatchableError: - return parse(await client.net_version(), UInt256) + return parse(await provider.client.net_version(), UInt256) method sendTransaction*( provider: JsonRpcProvider, rawTransaction: seq[byte] ): Future[TransactionResponse] {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let - client = await provider.client - hash = await client.eth_sendRawTransaction(rawTransaction) - + let hash = await provider.client.eth_sendRawTransaction(rawTransaction) return TransactionResponse(hash: hash, provider: provider) method subscribe*( provider: JsonRpcProvider, filter: EventFilter, onLog: LogHandler ): Future[Subscription] {.async: (raises: [ProviderError, CancelledError]).} = - convertError: - let subscriptions = await provider.subscriptions - let id = await subscriptions.subscribeLogs(filter, onLog) - return JsonRpcSubscription(subscriptions: subscriptions, id: id) + await provider.subscriptions.subscribe(filter, onLog) method subscribe*( provider: JsonRpcProvider, onBlock: BlockHandler ): Future[Subscription] {.async: (raises: [ProviderError, CancelledError]).} = - convertError: - let subscriptions = await provider.subscriptions - let id = await subscriptions.subscribeBlocks(onBlock) - return JsonRpcSubscription(subscriptions: subscriptions, id: id) - -method unsubscribe*( - subscription: JsonRpcSubscription -) {.async: (raises: [ProviderError, CancelledError]).} = - convertError: - let subscriptions = subscription.subscriptions - let id = subscription.id - await subscriptions.unsubscribe(id) + await provider.subscriptions.subscribe(onBlock) method isSyncing*( provider: JsonRpcProvider @@ -290,12 +211,14 @@ method close*( provider: JsonRpcProvider ) {.async: (raises: [ProviderError, CancelledError]).} = convertError: - let client = await provider.client - let subscriptions = await provider.subscriptions - await subscriptions.close() - await client.close() + await provider.subscriptions.close() + await provider.client.close() -# Signer +type + JsonRpcSigner* = ref object of Signer + provider: JsonRpcProvider + address: ?Address + JsonRpcSignerError* = object of SignerError proc raiseJsonRpcSignerError( message: string) {.raises: [JsonRpcSignerError].} = @@ -316,6 +239,12 @@ template convertSignerError(body) = except CatchableError as error: raise newException(JsonRpcSignerError, error.msg) +proc getSigner*(provider: JsonRpcProvider): JsonRpcSigner = + JsonRpcSigner(provider: provider) + +proc getSigner*(provider: JsonRpcProvider, address: Address): JsonRpcSigner = + JsonRpcSigner(provider: provider, address: some address) + method provider*(signer: JsonRpcSigner): Provider {.gcsafe, raises: [SignerError].} = @@ -337,9 +266,8 @@ method signMessage*( signer: JsonRpcSigner, message: seq[byte] ): Future[seq[byte]] {.async: (raises: [SignerError, CancelledError]).} = convertSignerError: - let client = await signer.provider.client let address = await signer.getAddress() - return await client.personal_sign(message, address) + return await signer.provider.client.personal_sign(message, address) method sendTransaction*( signer: JsonRpcSigner, transaction: Transaction @@ -347,8 +275,5 @@ method sendTransaction*( async: (raises: [SignerError, ProviderError, CancelledError]) .} = convertError: - let - client = await signer.provider.client - hash = await client.eth_sendTransaction(transaction) - + let hash = await signer.provider.client.eth_sendTransaction(transaction) return TransactionResponse(hash: hash, provider: signer.provider) diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim deleted file mode 100644 index 096fcb9..0000000 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ /dev/null @@ -1,364 +0,0 @@ -import std/tables -import std/sequtils -import std/strutils -import pkg/chronos -import pkg/questionable -import pkg/json_rpc/rpcclient -import pkg/serde -import ../../basics -import ../../errors -import ../../provider -include ../../nimshims/hashes -import ./rpccalls -import ./conversions - -export serde - -type - JsonRpcSubscriptions* = ref object of RootObj - client: RpcClient - callbacks: Table[JsonNode, SubscriptionCallback] - subscriptionHandler: MethodHandler - # Used by both PollingSubscriptions and WebsocketSubscriptions to store - # subscription filters so the subscriptions can be recreated. With - # PollingSubscriptions, the RPC node might prune/forget about them, and with - # WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5 - # minutes. - logFilters: Table[JsonNode, EventFilter] - MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].} - SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].} - -{.push raises:[].} - -template convertErrorsToSubscriptionError(body) = - try: - body - except CancelledError as error: - raise error - except CatchableError as error: - raise error.toErr(SubscriptionError) - -template `or`(a: JsonNode, b: typed): JsonNode = - if a.isNil: b else: a - -func start*(subscriptions: JsonRpcSubscriptions) = - subscriptions.client.onProcessMessage = - proc(client: RpcClient, line: string): Result[bool, string] = - if json =? JsonNode.fromJson(line): - if "method" in json and json{"method"}.getStr() == "eth_subscription": - if handler =? subscriptions.subscriptionHandler: - handler(json{"params"} or newJArray()) - return ok false # do not process using json-rpc default handler - return ok true # continue processing using json-rpc default handler - -method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, - onBlock: BlockHandler): - Future[JsonNode] - {.async: (raises: [SubscriptionError, CancelledError]), base,.} = - raiseAssert "not implemented" - -method subscribeLogs*(subscriptions: JsonRpcSubscriptions, - filter: EventFilter, - onLog: LogHandler): - Future[JsonNode] - {.async: (raises: [SubscriptionError, CancelledError]), base.} = - raiseAssert "not implemented" - -method unsubscribe*(subscriptions: JsonRpcSubscriptions, - id: JsonNode) - {.async: (raises: [CancelledError]), base.} = - raiseAssert "not implemented " - -method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: []), base.} = - let ids = toSeq subscriptions.callbacks.keys - for id in ids: - try: - await subscriptions.unsubscribe(id) - except CatchableError as e: - error "JsonRpc unsubscription failed", error = e.msg, id = id - -proc getCallback(subscriptions: JsonRpcSubscriptions, - id: JsonNode): ?SubscriptionCallback {. raises:[].} = - try: - if not id.isNil and id in subscriptions.callbacks: - return subscriptions.callbacks[id].some - except: discard - -# Web sockets - -# Default re-subscription period is seconds -const WsResubscribe {.intdefine.}: int = 0 - -type - WebSocketSubscriptions = ref object of JsonRpcSubscriptions - logFiltersLock: AsyncLock - resubscribeFut: Future[void] - resubscribeInterval: int - -template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) = - if subscriptions.logFiltersLock.isNil: - subscriptions.logFiltersLock = newAsyncLock() - - await subscriptions.logFiltersLock.acquire() - try: - body - finally: - subscriptions.logFiltersLock.release() - -# This is a workaround to manage the 5 minutes limit due to hardhat. -# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064 -proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebsocketSubscriptions) {.async: (raises: [CancelledError]).} = - while true: - await sleepAsync(subscriptions.resubscribeInterval.seconds) - try: - withLock(subscriptions): - for id, callback in subscriptions.callbacks: - - var newId: JsonNode - if id in subscriptions.logFilters: - let filter = subscriptions.logFilters[id] - newId = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.logFilters[newId] = filter - subscriptions.logFilters.del(id) - else: - newId = await subscriptions.client.eth_subscribe("newHeads") - - subscriptions.callbacks[newId] = callback - subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_unsubscribe(id) - except CancelledError as e: - raise e - except CatchableError as e: - error "WS resubscription failed" , error = e.msg - -proc new*(_: type JsonRpcSubscriptions, - client: RpcWebSocketClient, - resubscribeInterval = WsResubscribe): JsonRpcSubscriptions = - let subscriptions = WebSocketSubscriptions(client: client, resubscribeInterval: resubscribeInterval) - - proc subscriptionHandler(arguments: JsonNode) {.raises:[].} = - let id = arguments{"subscription"} or newJString("") - if callback =? subscriptions.getCallback(id): - callback(id, success(arguments)) - subscriptions.subscriptionHandler = subscriptionHandler - - if resubscribeInterval > 0: - if resubscribeInterval >= 300: - warn "Resubscription interval greater than 300 seconds is useless for hardhat workaround", resubscribeInterval = resubscribeInterval - - subscriptions.resubscribeFut = resubscribeWebsocketEventsOnTimeout(subscriptions) - - subscriptions - -method subscribeBlocks(subscriptions: WebSocketSubscriptions, - onBlock: BlockHandler): - Future[JsonNode] - {.async: (raises: [SubscriptionError, CancelledError]).} = - proc callback(id: JsonNode, argumentsResult: ?!JsonNode) {.raises: [].} = - without arguments =? argumentsResult, error: - onBlock(failure(Block, error.toErr(SubscriptionError))) - return - - let res = Block.fromJson(arguments{"result"}).mapFailure(SubscriptionError) - onBlock(res) - - convertErrorsToSubscriptionError: - withLock(subscriptions): - let id = await subscriptions.client.eth_subscribe("newHeads") - subscriptions.callbacks[id] = callback - return id - -method subscribeLogs(subscriptions: WebSocketSubscriptions, - filter: EventFilter, - onLog: LogHandler): - Future[JsonNode] - {.async: (raises: [SubscriptionError, CancelledError]).} = - proc callback(id: JsonNode, argumentsResult: ?!JsonNode) = - without arguments =? argumentsResult, error: - onLog(failure(Log, error.toErr(SubscriptionError))) - return - - let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError) - onLog(res) - - convertErrorsToSubscriptionError: - withLock(subscriptions): - let id = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.callbacks[id] = callback - subscriptions.logFilters[id] = filter - return id - -method unsubscribe*(subscriptions: WebSocketSubscriptions, - id: JsonNode) - {.async: (raises: [CancelledError]).} = - try: - withLock(subscriptions): - subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_unsubscribe(id) - except CancelledError as e: - raise e - except CatchableError: - # Ignore if uninstallation of the subscribiton fails. - discard - -method close*(subscriptions: WebSocketSubscriptions) {.async: (raises: []).} = - await procCall JsonRpcSubscriptions(subscriptions).close() - if not subscriptions.resubscribeFut.isNil: - await subscriptions.resubscribeFut.cancelAndWait() - -# Polling - -type - PollingSubscriptions* = ref object of JsonRpcSubscriptions - polling: Future[void] - - # Used when filters are recreated to translate from the id that user - # originally got returned to new filter id - subscriptionMapping: Table[JsonNode, JsonNode] - -proc new*(_: type JsonRpcSubscriptions, - client: RpcHttpClient, - pollingInterval = 4.seconds): JsonRpcSubscriptions = - - let subscriptions = PollingSubscriptions(client: client) - - proc resubscribe(id: JsonNode): Future[?!void] {.async: (raises: [CancelledError]).} = - try: - var newId: JsonNode - # Log filters are stored in logFilters, block filters are not persisted - # there is they do not need any specific data for their recreation. - # We use this to determine if the filter was log or block filter here. - if subscriptions.logFilters.hasKey(id): - let filter = subscriptions.logFilters[id] - newId = await subscriptions.client.eth_newFilter(filter) - else: - newId = await subscriptions.client.eth_newBlockFilter() - subscriptions.subscriptionMapping[id] = newId - except CancelledError as e: - raise e - except CatchableError as e: - return failure(void, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) - - return success() - - proc getChanges(id: JsonNode): Future[?!JsonNode] {.async: (raises: [CancelledError]).} = - if mappedId =? subscriptions.subscriptionMapping.?[id]: - try: - let changes = await subscriptions.client.eth_getFilterChanges(mappedId) - if changes.kind == JArray: - return success(changes) - except JsonRpcError as e: - if error =? (await resubscribe(id)).errorOption: - return failure(JsonNode, error) - - # TODO: we could still miss some events between losing the subscription - # and resubscribing. We should probably adopt a strategy like ethers.js, - # whereby we keep track of the latest block number that we've seen - # filter changes for: - # https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977 - - if not ("filter not found" in e.msg): - return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) - except CancelledError as e: - raise e - except SubscriptionError as e: - return failure(JsonNode, e) - except CatchableError as e: - return failure(JsonNode, e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription changes: " & e.msg)) - return success(newJArray()) - - proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} = - without callback =? subscriptions.getCallback(id): - return - - without changes =? (await getChanges(id)), error: - callback(id, failure(JsonNode, error)) - return - - for change in changes: - callback(id, success(change)) - - proc poll {.async: (raises: []).} = - try: - while true: - for id in toSeq subscriptions.callbacks.keys: - await poll(id) - await sleepAsync(pollingInterval) - except CancelledError: - discard - - subscriptions.polling = poll() - asyncSpawn subscriptions.polling - subscriptions - -method close*(subscriptions: PollingSubscriptions) {.async: (raises: []).} = - await subscriptions.polling.cancelAndWait() - await procCall JsonRpcSubscriptions(subscriptions).close() - -method subscribeBlocks(subscriptions: PollingSubscriptions, - onBlock: BlockHandler): - Future[JsonNode] - {.async: (raises: [SubscriptionError, CancelledError]).} = - - proc getBlock(hash: BlockHash) {.async: (raises:[]).} = - try: - if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)): - onBlock(success(blck)) - except CancelledError: - discard - except CatchableError as e: - let error = e.toErr(SubscriptionError, "HTTP polling: There was an exception while getting subscription's block: " & e.msg) - onBlock(failure(Block, error)) - - proc callback(id: JsonNode, changeResult: ?!JsonNode) {.raises:[].} = - without change =? changeResult, e: - onBlock(failure(Block, e.toErr(SubscriptionError))) - return - - if hash =? BlockHash.fromJson(change): - asyncSpawn getBlock(hash) - - convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_newBlockFilter() - subscriptions.callbacks[id] = callback - subscriptions.subscriptionMapping[id] = id - return id - -method subscribeLogs(subscriptions: PollingSubscriptions, - filter: EventFilter, - onLog: LogHandler): - Future[JsonNode] - {.async: (raises: [SubscriptionError, CancelledError]).} = - - proc callback(id: JsonNode, argumentsResult: ?!JsonNode) = - without arguments =? argumentsResult, error: - onLog(failure(Log, error.toErr(SubscriptionError))) - return - - let res = Log.fromJson(arguments).mapFailure(SubscriptionError) - onLog(res) - - convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_newFilter(filter) - subscriptions.callbacks[id] = callback - subscriptions.logFilters[id] = filter - subscriptions.subscriptionMapping[id] = id - return id - -method unsubscribe*(subscriptions: PollingSubscriptions, - id: JsonNode) - {.async: (raises: [CancelledError]).} = - try: - subscriptions.logFilters.del(id) - subscriptions.callbacks.del(id) - if sub =? subscriptions.subscriptionMapping.?[id]: - subscriptions.subscriptionMapping.del(id) - discard await subscriptions.client.eth_uninstallFilter(sub) - except CancelledError as e: - raise e - except CatchableError: - # Ignore if uninstallation of the filter fails. If it's the last step in our - # cleanup, then filter changes for this filter will no longer be polled so - # if the filter continues to live on in geth for whatever reason then it - # doesn't matter. - discard diff --git a/testmodule/providers/jsonrpc/rpc_mock.nim b/testmodule/providers/jsonrpc/rpc_mock.nim deleted file mode 100644 index 4abdb6e..0000000 --- a/testmodule/providers/jsonrpc/rpc_mock.nim +++ /dev/null @@ -1,56 +0,0 @@ -import ../../examples -import ../../../ethers/provider -import ../../../ethers/providers/jsonrpc/conversions - -import std/sequtils -import pkg/stew/byteutils -import pkg/json_rpc/rpcserver except `%`, `%*` -import pkg/json_rpc/errors - -type MockRpcHttpServer* = ref object - filters*: seq[string] - nextGetChangesReturnsError*: bool - srv: RpcHttpServer - -proc new*(_: type MockRpcHttpServer): MockRpcHttpServer = - let srv = newRpcHttpServer(["127.0.0.1:0"]) - MockRpcHttpServer(filters: @[], srv: srv, nextGetChangesReturnsError: false) - -proc invalidateFilter*(server: MockRpcHttpServer, jsonId: JsonNode) = - server.filters.keepItIf it != jsonId.getStr - -proc start*(server: MockRpcHttpServer) = - server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string: - let filterId = "0x" & (array[16, byte].example).toHex - server.filters.add filterId - return filterId - - server.srv.router.rpc("eth_newBlockFilter") do() -> string: - let filterId = "0x" & (array[16, byte].example).toHex - server.filters.add filterId - return filterId - - server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]: - if server.nextGetChangesReturnsError: - raise (ref ApplicationError)(code: -32000, msg: "unknown error") - - if id notin server.filters: - raise (ref ApplicationError)(code: -32000, msg: "filter not found") - - return @[] - - server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool: - if id notin server.filters: - raise (ref ApplicationError)(code: -32000, msg: "filter not found") - - server.invalidateFilter(%id) - return true - - server.srv.start() - -proc stop*(server: MockRpcHttpServer) {.async.} = - await server.srv.stop() - await server.srv.closeWait() - -proc localAddress*(server: MockRpcHttpServer): seq[TransportAddress] = - return server.srv.localAddress() diff --git a/testmodule/providers/jsonrpc/testJsonRpcProvider.nim b/testmodule/providers/jsonrpc/testJsonRpcProvider.nim index 5b79c5a..fae1e07 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcProvider.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcProvider.nim @@ -15,14 +15,14 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: var provider: JsonRpcProvider setup: - provider = JsonRpcProvider.new(url, pollingInterval = 100.millis) - + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) teardown: await provider.close() test "can be instantiated with a default URL": - discard JsonRpcProvider.new() + let provider = await JsonRpcProvider.connect() + await provider.close() test "lists all accounts": let accounts = await provider.listAccounts() @@ -87,20 +87,9 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: expect EthersError: discard await confirming - test "raises JsonRpcProviderError when something goes wrong": - let provider = JsonRpcProvider.new("http://invalid.") + test "raises JsonRpcProviderError when connect fails": expect JsonRpcProviderError: - discard await provider.listAccounts() - expect JsonRpcProviderError: - discard await provider.send("evm_mine") - expect JsonRpcProviderError: - discard await provider.getBlockNumber() - expect JsonRpcProviderError: - discard await provider.getBlock(BlockTag.latest) - expect JsonRpcProviderError: - discard await provider.subscribe(proc(_: ?!Block) = discard) - expect JsonRpcProviderError: - discard await provider.getSigner().sendTransaction(Transaction.example) + discard await JsonRpcProvider.connect("http://invalid.") test "syncing": let isSyncing = await provider.isSyncing() diff --git a/testmodule/providers/jsonrpc/testJsonRpcSigner.nim b/testmodule/providers/jsonrpc/testJsonRpcSigner.nim index 31d6df5..c4c1dd5 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSigner.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSigner.nim @@ -8,10 +8,10 @@ suite "JsonRpcSigner": var provider: JsonRpcProvider var accounts: seq[Address] - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("http://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) accounts = await provider.listAccounts() teardown: @@ -75,7 +75,7 @@ suite "JsonRpcSigner": let blk = !(await signer.provider.getBlock(BlockTag.latest)) transaction.maxFeePerGas = some(!blk.baseFeePerGas * 2.u256 + !populated.maxPriorityFeePerGas) - + check populated == transaction test "populate fails when sender does not match signer address": diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index fc89736..4a885f3 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -1,218 +1,66 @@ import std/os -import std/importutils import pkg/asynctest/chronos/unittest import pkg/serde import pkg/json_rpc/rpcclient import pkg/json_rpc/rpcserver -import ethers/provider -import ethers/providers/jsonrpc/subscriptions +import ethers/providers/jsonrpc -import ../../examples -import ./rpc_mock +let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") +for url in ["ws://" & providerUrl, "http://" & providerUrl]: -suite "JsonRpcSubscriptions": + suite "JSON-RPC Subscriptions (" & url & ")": - test "can be instantiated with an http client": - let client = newRpcHttpClient() - let subscriptions = JsonRpcSubscriptions.new(client) - check not isNil subscriptions + var provider: JsonRpcProvider - test "can be instantiated with a websocket client": - let client = newRpcWebSocketClient() - let subscriptions = JsonRpcSubscriptions.new(client) - check not isNil subscriptions + setup: + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) -template subscriptionTests(subscriptions, client) = + teardown: + await provider.close() - test "subscribes to new blocks": - var latestBlock: Block - proc callback(blck: ?!Block) = - latestBlock = blck.value - let subscription = await subscriptions.subscribeBlocks(callback) - discard await client.call("evm_mine", newJArray()) - check eventually latestBlock.number.isSome - check latestBlock.hash.isSome - check latestBlock.timestamp > 0.u256 - await subscriptions.unsubscribe(subscription) + test "subscribes to new blocks": + var latestBlock: Block + proc callback(blck: ?!Block) = + latestBlock = blck.value + let subscription = await provider.subscribe(callback) + discard await provider.send("evm_mine") + check eventually latestBlock.number.isSome + check latestBlock.hash.isSome + check latestBlock.timestamp > 0.u256 + await subscription.unsubscribe() - test "stops listening to new blocks when unsubscribed": - var count = 0 - proc callback(blck: ?!Block) = - if blck.isOk: - inc count - let subscription = await subscriptions.subscribeBlocks(callback) - discard await client.call("evm_mine", newJArray()) - check eventually count > 0 - await subscriptions.unsubscribe(subscription) - count = 0 - discard await client.call("evm_mine", newJArray()) - await sleepAsync(100.millis) - check count == 0 + test "stops listening to new blocks when unsubscribed": + var count = 0 + proc callback(blck: ?!Block) = + if blck.isOk: + inc count + let subscription = await provider.subscribe(callback) + discard await provider.send("evm_mine") + check eventually count > 0 + await subscription.unsubscribe() + count = 0 + discard await provider.send("evm_mine") + await sleepAsync(200.millis) + check count == 0 - test "unsubscribing from a non-existent subscription does not do any harm": - await subscriptions.unsubscribe(newJInt(0)) + test "duplicate unsubscribe is harmless": + proc callback(blck: ?!Block) = discard + let subscription = await provider.subscribe(callback) + await subscription.unsubscribe() + await subscription.unsubscribe() - test "duplicate unsubscribe is harmless": - proc callback(blck: ?!Block) = discard - let subscription = await subscriptions.subscribeBlocks(callback) - await subscriptions.unsubscribe(subscription) - await subscriptions.unsubscribe(subscription) + test "stops listening to new blocks when provider is closed": + var count = 0 + proc callback(blck: ?!Block) = + if blck.isOk: + inc count + discard await provider.subscribe(callback) + discard await provider.send("evm_mine") + check eventually count > 0 + await provider.close() + count = 0 + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) + discard await provider.send("evm_mine") + await sleepAsync(200.millis) + check count == 0 - test "stops listening to new blocks when provider is closed": - var count = 0 - proc callback(blck: ?!Block) = - if blck.isOk: - inc count - discard await subscriptions.subscribeBlocks(callback) - discard await client.call("evm_mine", newJArray()) - check eventually count > 0 - await subscriptions.close() - count = 0 - discard await client.call("evm_mine", newJArray()) - await sleepAsync(100.millis) - check count == 0 - -suite "Web socket subscriptions": - - var subscriptions: JsonRpcSubscriptions - var client: RpcWebSocketClient - - setup: - client = newRpcWebSocketClient() - await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")) - subscriptions = JsonRpcSubscriptions.new(client) - subscriptions.start() - - teardown: - await subscriptions.close() - await client.close() - - subscriptionTests(subscriptions, client) - -suite "HTTP polling subscriptions": - - var subscriptions: JsonRpcSubscriptions - var client: RpcHttpClient - - setup: - client = newRpcHttpClient() - await client.connect("http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")) - subscriptions = JsonRpcSubscriptions.new(client, - pollingInterval = 100.millis) - subscriptions.start() - - teardown: - await subscriptions.close() - await client.close() - - subscriptionTests(subscriptions, client) - -suite "HTTP polling subscriptions - mock tests": - - var subscriptions: PollingSubscriptions - var client: RpcHttpClient - var mockServer: MockRpcHttpServer - - privateAccess(PollingSubscriptions) - privateAccess(JsonRpcSubscriptions) - - proc startServer() {.async.} = - mockServer = MockRpcHttpServer.new() - mockServer.start() - await client.connect("http://" & $mockServer.localAddress()[0]) - - proc stopServer() {.async.} = - await mockServer.stop() - - setup: - client = newRpcHttpClient() - await startServer() - - subscriptions = PollingSubscriptions( - JsonRpcSubscriptions.new( - client, - pollingInterval = 1.millis)) - subscriptions.start() - - teardown: - await subscriptions.close() - await client.close() - await mockServer.stop() - - test "filter not found error recreates log filter": - let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: ?!Log) = discard - - check subscriptions.logFilters.len == 0 - check subscriptions.subscriptionMapping.len == 0 - - let id = await subscriptions.subscribeLogs(filter, emptyHandler) - - check subscriptions.logFilters[id] == filter - check subscriptions.subscriptionMapping[id] == id - check subscriptions.logFilters.len == 1 - check subscriptions.subscriptionMapping.len == 1 - - mockServer.invalidateFilter(id) - - check eventually subscriptions.subscriptionMapping[id] != id - - test "recreated log filter can be still unsubscribed using the original id": - let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: ?!Log) = discard - let id = await subscriptions.subscribeLogs(filter, emptyHandler) - mockServer.invalidateFilter(id) - check eventually subscriptions.subscriptionMapping[id] != id - - await subscriptions.unsubscribe(id) - - check not subscriptions.logFilters.hasKey id - check not subscriptions.subscriptionMapping.hasKey id - - test "filter not found error recreates block filter": - let emptyHandler = proc(blck: ?!Block) = discard - - check subscriptions.subscriptionMapping.len == 0 - let id = await subscriptions.subscribeBlocks(emptyHandler) - check subscriptions.subscriptionMapping[id] == id - - mockServer.invalidateFilter(id) - - check eventually subscriptions.subscriptionMapping[id] != id - - test "recreated block filter can be still unsubscribed using the original id": - let emptyHandler = proc(blck: ?!Block) = discard - let id = await subscriptions.subscribeBlocks(emptyHandler) - mockServer.invalidateFilter(id) - check eventually subscriptions.subscriptionMapping[id] != id - - await subscriptions.unsubscribe(id) - - check not subscriptions.subscriptionMapping.hasKey id - - test "polling continues with new filter after temporary error": - let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: ?!Log) = discard - - let id = await subscriptions.subscribeLogs(filter, emptyHandler) - - await stopServer() - mockServer.invalidateFilter(id) - await sleepAsync(50.milliseconds) - await startServer() - - check eventually subscriptions.subscriptionMapping[id] != id - - test "calls callback with failed result on error": - let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - var failedResultReceived = false - - proc handler(log: ?!Log) = - if log.isErr: - failedResultReceived = true - - discard await subscriptions.subscribeLogs(filter, handler) - - await sleepAsync(50.milliseconds) - mockServer.nextGetChangesReturnsError = true - check eventually failedResultReceived diff --git a/testmodule/providers/jsonrpc/testWsResubscription.nim b/testmodule/providers/jsonrpc/testWsResubscription.nim deleted file mode 100644 index db09237..0000000 --- a/testmodule/providers/jsonrpc/testWsResubscription.nim +++ /dev/null @@ -1,56 +0,0 @@ -import std/os -import std/importutils -import pkg/asynctest/chronos/unittest -import pkg/json_rpc/rpcclient -import ethers/provider -import ethers/providers/jsonrpc/subscriptions - -import ../../examples - -suite "Websocket re-subscriptions": - privateAccess(JsonRpcSubscriptions) - - var subscriptions: JsonRpcSubscriptions - var client: RpcWebSocketClient - var resubscribeInterval: int - - setup: - resubscribeInterval = 3 - client = newRpcWebSocketClient() - await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")) - subscriptions = JsonRpcSubscriptions.new(client, resubscribeInterval = resubscribeInterval) - subscriptions.start() - - teardown: - await subscriptions.close() - await client.close() - - test "unsubscribing from a log filter while subscriptions are being resubscribed does not cause a concurrency error": - let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: ?!Log) = discard - - for i in 1..10: - discard await subscriptions.subscribeLogs(filter, emptyHandler) - - # Wait until the re-subscription starts - await sleepAsync(resubscribeInterval.seconds) - - # Attempt to modify callbacks while its being iterated - discard await subscriptions.subscribeLogs(filter, emptyHandler) - - test "resubscribe events take effect with new subscription IDs in the log filters": - let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) - let emptyHandler = proc(log: ?!Log) = discard - let id = await subscriptions.subscribeLogs(filter, emptyHandler) - - check id in subscriptions.logFilters - check subscriptions.logFilters.len == 1 - - # Make sure the subscription is done - await sleepAsync((resubscribeInterval + 1).seconds) - - # The previous subscription should not be in the log filters - check id notin subscriptions.logFilters - - # There is still one subscription which is the new one - check subscriptions.logFilters.len == 1 diff --git a/testmodule/providers/testJsonRpc.nim b/testmodule/providers/testJsonRpc.nim index 660c921..8e7de1d 100644 --- a/testmodule/providers/testJsonRpc.nim +++ b/testmodule/providers/testJsonRpc.nim @@ -1,7 +1,6 @@ import ./jsonrpc/testJsonRpcProvider import ./jsonrpc/testJsonRpcSigner import ./jsonrpc/testJsonRpcSubscriptions -import ./jsonrpc/testWsResubscription import ./jsonrpc/testConversions import ./jsonrpc/testErrors diff --git a/testmodule/testContracts.nim b/testmodule/testContracts.nim index f1e70d1..487b186 100644 --- a/testmodule/testContracts.nim +++ b/testmodule/testContracts.nim @@ -28,7 +28,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: var accounts: seq[Address] setup: - provider = JsonRpcProvider.new(url, pollingInterval = 100.millis) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") accounts = await provider.listAccounts() let deployment = readDeployment() diff --git a/testmodule/testCustomErrors.nim b/testmodule/testCustomErrors.nim index 884bc31..c6db6e9 100644 --- a/testmodule/testCustomErrors.nim +++ b/testmodule/testCustomErrors.nim @@ -23,10 +23,10 @@ suite "Contract custom errors": var contract: TestCustomErrors var provider: JsonRpcProvider var snapshot: JsonNode - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("http://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") let deployment = readDeployment() let address = !deployment.address(TestCustomErrors) diff --git a/testmodule/testEnums.nim b/testmodule/testEnums.nim index b5426ed..5bcb082 100644 --- a/testmodule/testEnums.nim +++ b/testmodule/testEnums.nim @@ -15,10 +15,10 @@ suite "Contract enum parameters and return values": var contract: TestEnums var provider: JsonRpcProvider var snapshot: JsonNode - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("http://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") let deployment = readDeployment() contract = TestEnums.new(!deployment.address(TestEnums), provider) diff --git a/testmodule/testErc20.nim b/testmodule/testErc20.nim index 8bcb167..07e3c4c 100644 --- a/testmodule/testErc20.nim +++ b/testmodule/testErc20.nim @@ -24,7 +24,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: var accounts: seq[Address] setup: - provider = JsonRpcProvider.new(url, pollingInterval = 100.millis) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") accounts = await provider.listAccounts() let deployment = readDeployment() diff --git a/testmodule/testGasEstimation.nim b/testmodule/testGasEstimation.nim index a7d2df2..638aba7 100644 --- a/testmodule/testGasEstimation.nim +++ b/testmodule/testGasEstimation.nim @@ -15,10 +15,10 @@ suite "gas estimation": var contract: TestGasEstimation var provider: JsonRpcProvider var snapshot: JsonNode - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("http://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") let deployment = readDeployment() let signer = provider.getSigner() diff --git a/testmodule/testReturns.nim b/testmodule/testReturns.nim index 773b98c..0e0b4d0 100644 --- a/testmodule/testReturns.nim +++ b/testmodule/testReturns.nim @@ -14,10 +14,10 @@ suite "Contract return values": var contract: TestReturns var provider: JsonRpcProvider var snapshot: JsonNode - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("http://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") let deployment = readDeployment() contract = TestReturns.new(!deployment.address(TestReturns), provider) diff --git a/testmodule/testTesting.nim b/testmodule/testTesting.nim index 4c94180..f92ef3a 100644 --- a/testmodule/testTesting.nim +++ b/testmodule/testTesting.nim @@ -93,10 +93,10 @@ suite "Testing helpers - contracts": var snapshot: JsonNode var accounts: seq[Address] let revertReason = "revert reason" - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("ws://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") accounts = await provider.listAccounts() helpersContract = TestHelpers.new(provider.getSigner()) diff --git a/testmodule/testWallet.nim b/testmodule/testWallet.nim index 21b5018..47f48f3 100644 --- a/testmodule/testWallet.nim +++ b/testmodule/testWallet.nim @@ -13,10 +13,10 @@ proc transfer*(erc20: Erc20, recipient: Address, amount: UInt256) {.contract.} suite "Wallet": var provider: JsonRpcProvider var snapshot: JsonNode - let providerUrl = getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") + let url = "http://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545") setup: - provider = JsonRpcProvider.new("http://" & providerUrl) + provider = await JsonRpcProvider.connect(url, pollingInterval = 100.millis) snapshot = await provider.send("evm_snapshot") teardown: @@ -31,13 +31,12 @@ suite "Wallet": check isSuccess Wallet.new("0x" & pk1) test "Can create Wallet with provider": - let provider = JsonRpcProvider.new() check isSuccess Wallet.new(pk1, provider) discard Wallet.new(PrivateKey.fromHex(pk1).get, provider) test "Cannot create wallet with invalid key string": check isFailure Wallet.new("0xInvalidKey") - check isFailure Wallet.new("0xInvalidKey", JsonRpcProvider.new()) + check isFailure Wallet.new("0xInvalidKey", provider) test "Can connect Wallet to provider": let wallet = !Wallet.new(pk1) From ca30aa7e4d52ba1e0a5224f894326d6fe435e690 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 9 Sep 2025 17:25:46 +0200 Subject: [PATCH 10/13] fix: add missing .base. pragma --- ethers/signer.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethers/signer.nim b/ethers/signer.nim index e38509e..a657566 100644 --- a/ethers/signer.nim +++ b/ethers/signer.nim @@ -58,7 +58,7 @@ method getGasPrice*( method getMaxPriorityFeePerGas*( signer: Signer -): Future[UInt256] {.async: (raises: [SignerError, CancelledError]).} = +): Future[UInt256] {.base, async: (raises: [SignerError, CancelledError]).} = return await signer.provider.getMaxPriorityFeePerGas() method getTransactionCount*( From c4b31302139632de2c23631294e0b699780c9dec Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 10 Sep 2025 10:18:10 +0200 Subject: [PATCH 11/13] refactor(subscriptions): cleanup tx.confirm() --- ethers/provider.nim | 69 +++++++++++++-------------------------------- 1 file changed, 20 insertions(+), 49 deletions(-) diff --git a/ethers/provider.nim b/ethers/provider.nim index 9ceb217..8b2b1c1 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -235,69 +235,40 @@ proc confirm*( assert confirmations > 0 - var blockNumber: UInt256 - - ## We need initialized succesfull Result, because the first iteration of the `while` loop - ## bellow is triggered "manually" by calling `await updateBlockNumber` and not by block - ## subscription. If left uninitialized then the Result is in error state and error is raised. - ## This result is not used for block value, but for block subscription errors. - var blockSubscriptionResult: ?!Block = success(Block(number: UInt256.none, timestamp: 0.u256, hash: BlockHash.none)) + var blockNumber = await tx.provider.getBlockNumber() let blockEvent = newAsyncEvent() - - proc updateBlockNumber {.async: (raises: []).} = - try: - let number = await tx.provider.getBlockNumber() - if number > blockNumber: - blockNumber = number - blockEvent.fire() - except ProviderError, CancelledError: - # there's nothing we can do here - discard + blockEvent.fire() proc onBlock(blckResult: ?!Block) = - blockSubscriptionResult = blckResult - - if blckResult.isErr: + if blck =? blckResult and number =? blck.number: + blockNumber = number blockEvent.fire() - return - # ignore block parameter; hardhat may call this with pending blocks - asyncSpawn updateBlockNumber() - - await updateBlockNumber() let subscription = await tx.provider.subscribe(onBlock) let finish = blockNumber + timeout.u256 - var receipt: ?TransactionReceipt - while true: - await blockEvent.wait() - blockEvent.clear() + try: + var receipt: ?TransactionReceipt - if blockSubscriptionResult.isErr: - let error = blockSubscriptionResult.error() + while true: + await blockEvent.wait() + blockEvent.clear() - if error of SubscriptionError: - raise (ref SubscriptionError)(error) - elif error of CancelledError: - raise (ref CancelledError)(error) - else: - raise error.toErr(ProviderError) + if blockNumber >= finish: + raise newException(EthersError, "tx not mined before timeout") - if blockNumber >= finish: - await subscription.unsubscribe() - raise newException(EthersError, "tx not mined before timeout") + if receipt.?blockNumber.isNone: + receipt = await tx.provider.getTransactionReceipt(tx.hash) - if receipt.?blockNumber.isNone: - receipt = await tx.provider.getTransactionReceipt(tx.hash) + without receipt =? receipt and txBlockNumber =? receipt.blockNumber: + continue - without receipt =? receipt and txBlockNumber =? receipt.blockNumber: - continue - - if txBlockNumber + confirmations.u256 <= blockNumber + 1: - await subscription.unsubscribe() - await tx.provider.ensureSuccess(receipt) - return receipt + if txBlockNumber + confirmations.u256 <= blockNumber + 1: + await tx.provider.ensureSuccess(receipt) + return receipt + finally: + await subscription.unsubscribe() proc confirm*( tx: Future[TransactionResponse], From 248965f7f655fa74b4aea53f9660f6ad1dbc73f5 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 10 Sep 2025 10:29:49 +0200 Subject: [PATCH 12/13] refactor!(subscriptions): remove ?! from callbacks --- Readme.md | 7 ++----- ethers/contracts/filters.nim | 10 +++------- ethers/provider.nim | 8 ++++---- ethers/subscriptions.nim | 4 ++-- .../providers/jsonrpc/testJsonRpcProvider.nim | 2 +- .../jsonrpc/testJsonRpcSubscriptions.nim | 17 +++++++---------- testmodule/testContracts.nim | 10 +++------- 7 files changed, 22 insertions(+), 36 deletions(-) diff --git a/Readme.md b/Readme.md index b1ff00e..30bb259 100644 --- a/Readme.md +++ b/Readme.md @@ -137,11 +137,8 @@ You can now subscribe to Transfer events by calling `subscribe` on the contract instance. ```nim -proc handleTransfer(transferResult: ?!Transfer) = - if transferResult.isOk: - echo "received transfer: ", transferResult.value - else: - echo "error during transfer: ", transferResult.error.msg +proc handleTransfer(transferResult: Transfer) = + echo "received transfer: ", transferResult.value let subscription = await token.subscribe(Transfer, handleTransfer) ``` diff --git a/ethers/contracts/filters.nim b/ethers/contracts/filters.nim index 6e6c0cf..09d2ca0 100644 --- a/ethers/contracts/filters.nim +++ b/ethers/contracts/filters.nim @@ -6,7 +6,7 @@ import ./contract import ./events import ./fields -type EventHandler*[E: Event] = proc(event: ?!E) {.gcsafe, raises:[].} +type EventHandler*[E: Event] = proc(event: E) {.gcsafe, raises:[].} proc subscribe*[E: Event](contract: Contract, _: type E, @@ -16,13 +16,9 @@ proc subscribe*[E: Event](contract: Contract, let topic = topic($E, E.fieldTypes).toArray let filter = EventFilter(address: contract.address, topics: @[topic]) - proc logHandler(logResult: ?!Log) {.raises: [].} = - without log =? logResult, error: - handler(failure(E, error)) - return - + proc logHandler(log: Log) {.raises: [].} = if event =? E.decode(log.data, log.topics): - handler(success(event)) + handler(event) contract.provider.subscribe(filter, logHandler) diff --git a/ethers/provider.nim b/ethers/provider.nim index 8b2b1c1..d3db60f 100644 --- a/ethers/provider.nim +++ b/ethers/provider.nim @@ -57,8 +57,8 @@ type effectiveGasPrice*: ?UInt256 status*: TransactionStatus transactionType* {.serialize("type"), deserialize("type").}: TransactionType - LogHandler* = proc(log: ?!Log) {.gcsafe, raises:[].} - BlockHandler* = proc(blck: ?!Block) {.gcsafe, raises:[].} + LogHandler* = proc(log: Log) {.gcsafe, raises:[].} + BlockHandler* = proc(blck: Block) {.gcsafe, raises:[].} Topic* = array[32, byte] Block* {.serialize.} = object number*: ?BlockNumber @@ -239,8 +239,8 @@ proc confirm*( let blockEvent = newAsyncEvent() blockEvent.fire() - proc onBlock(blckResult: ?!Block) = - if blck =? blckResult and number =? blck.number: + proc onBlock(blck: Block) = + if number =? blck.number: blockNumber = number blockEvent.fire() diff --git a/ethers/subscriptions.nim b/ethers/subscriptions.nim index b71001e..eb661b3 100644 --- a/ethers/subscriptions.nim +++ b/ethers/subscriptions.nim @@ -92,11 +92,11 @@ proc processBlock( return false let logs = await subscriptions.getLogs(blck) for handler in subscriptions.blockSubscriptions.values: - handler(success blck) + handler(blck) for (id, logs) in logs.pairs: if (_, handler) =? subscriptions.logSubscriptions.?[id]: for log in logs: - handler(success log) + handler(log) return true except ProviderError: return false diff --git a/testmodule/providers/jsonrpc/testJsonRpcProvider.nim b/testmodule/providers/jsonrpc/testJsonRpcProvider.nim index fae1e07..133060f 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcProvider.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcProvider.nim @@ -49,7 +49,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: let oldBlock = !await provider.getBlock(BlockTag.latest) discard await provider.send("evm_mine") var newBlock: Block - let blockHandler = proc(blck: ?!Block) {.raises:[].}= newBlock = blck.value + let blockHandler = proc(blck: Block) = newBlock = blck let subscription = await provider.subscribe(blockHandler) discard await provider.send("evm_mine") check eventually newBlock.number.isSome diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index 4a885f3..f26c647 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -1,6 +1,5 @@ import std/os import pkg/asynctest/chronos/unittest -import pkg/serde import pkg/json_rpc/rpcclient import pkg/json_rpc/rpcserver import ethers/providers/jsonrpc @@ -20,8 +19,8 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: test "subscribes to new blocks": var latestBlock: Block - proc callback(blck: ?!Block) = - latestBlock = blck.value + proc callback(blck: Block) = + latestBlock = blck let subscription = await provider.subscribe(callback) discard await provider.send("evm_mine") check eventually latestBlock.number.isSome @@ -31,9 +30,8 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: test "stops listening to new blocks when unsubscribed": var count = 0 - proc callback(blck: ?!Block) = - if blck.isOk: - inc count + proc callback(blck: Block) = + inc count let subscription = await provider.subscribe(callback) discard await provider.send("evm_mine") check eventually count > 0 @@ -44,16 +42,15 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: check count == 0 test "duplicate unsubscribe is harmless": - proc callback(blck: ?!Block) = discard + proc callback(blck: Block) = discard let subscription = await provider.subscribe(callback) await subscription.unsubscribe() await subscription.unsubscribe() test "stops listening to new blocks when provider is closed": var count = 0 - proc callback(blck: ?!Block) = - if blck.isOk: - inc count + proc callback(blck: Block) = + inc count discard await provider.subscribe(callback) discard await provider.send("evm_mine") check eventually count > 0 diff --git a/testmodule/testContracts.nim b/testmodule/testContracts.nim index 487b186..bca0146 100644 --- a/testmodule/testContracts.nim +++ b/testmodule/testContracts.nim @@ -157,10 +157,7 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: test "receives events when subscribed": var transfers: seq[Transfer] - proc handleTransfer(transferRes: ?!Transfer) = - without transfer =? transferRes, error: - echo error.msg - + proc handleTransfer(transfer: Transfer) = transfers.add(transfer) let signer0 = provider.getSigner(accounts[0]) @@ -182,9 +179,8 @@ for url in ["ws://" & providerUrl, "http://" & providerUrl]: test "stops receiving events when unsubscribed": var transfers: seq[Transfer] - proc handleTransfer(transferRes: ?!Transfer) = - if transfer =? transferRes: - transfers.add(transfer) + proc handleTransfer(transfer: Transfer) = + transfers.add(transfer) let signer0 = provider.getSigner(accounts[0]) From 86b9a02054833816d704f39cc4f5e3fcd073481f Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Wed, 10 Sep 2025 11:29:19 +0200 Subject: [PATCH 13/13] feat(subscriptions): use websocket to get instant updates --- ethers/providers/jsonrpc.nim | 2 ++ ethers/providers/jsonrpc/websocket.nim | 34 ++++++++++++++++++++++++ ethers/subscriptions/blocksubscriber.nim | 5 ++-- 3 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 ethers/providers/jsonrpc/websocket.nim diff --git a/ethers/providers/jsonrpc.nim b/ethers/providers/jsonrpc.nim index 004c931..6f2f538 100644 --- a/ethers/providers/jsonrpc.nim +++ b/ethers/providers/jsonrpc.nim @@ -12,6 +12,7 @@ import ../signer import ./jsonrpc/rpccalls import ./jsonrpc/conversions import ./jsonrpc/errors +import ./jsonrpc/websocket export basics export provider @@ -49,6 +50,7 @@ proc connect*( await websocket.connect(url) provider.client = websocket provider.subscriptions = Subscriptions.new(provider, pollingInterval) + await provider.subscriptions.useWebsocketUpdates(websocket) else: let http = newRpcHttpClient(getHeaders = jsonHeaders) await http.connect(url) diff --git a/ethers/providers/jsonrpc/websocket.nim b/ethers/providers/jsonrpc/websocket.nim new file mode 100644 index 0000000..cbf4299 --- /dev/null +++ b/ethers/providers/jsonrpc/websocket.nim @@ -0,0 +1,34 @@ +import std/json +import pkg/json_rpc/rpcclient +import ../../basics +import ../../subscriptions +import ./rpccalls +import ./errors + +proc useWebsocketUpdates*( + subscriptions: Subscriptions, + websocket: RpcWebSocketClient +) {.async:(raises:[JsonRpcProviderError, CancelledError]).} = + var rpcSubscriptionId: JsonNode + + proc processMessage(client: RpcClient, message: string): Result[bool, string] = + without message =? parseJson(message).catch: + return ok true + without rpcMethod =? message{"method"}: + return ok true + if rpcMethod.getStr() != "eth_subscription": + return ok true + without rpcParameter =? message{"params"}{"subscription"}: + return ok true + if rpcParameter != rpcSubscriptionId: + return ok true + + subscriptions.update() + + ok false # do not process further using json-rpc default handler + + assert websocket.onProcessMessage.isNil + websocket.onProcessMessage = processMessage + + convertError: + rpcSubscriptionId = await websocket.eth_subscribe("newHeads") diff --git a/ethers/subscriptions/blocksubscriber.nim b/ethers/subscriptions/blocksubscriber.nim index 305cd93..f1e6cc3 100644 --- a/ethers/subscriptions/blocksubscriber.nim +++ b/ethers/subscriptions/blocksubscriber.nim @@ -22,7 +22,8 @@ func new*( BlockSubscriber( provider: provider, processor: processor, - pollingInterval: pollingInterval + pollingInterval: pollingInterval, + wake: newAsyncEvent() ) proc sleep(subscriber: BlockSubscriber) {.async:(raises:[CancelledError]).} = @@ -51,7 +52,7 @@ proc start*( if subscriber.looping.isNil: subscriber.lastSeen = await subscriber.provider.getBlockNumber() subscriber.lastProcessed = subscriber.lastSeen - subscriber.wake = newAsyncEvent() + subscriber.wake.clear() subscriber.looping = subscriber.loop() proc stop*(subscriber: BlockSubscriber) {.async:(raises:[]).} =