From af3d7379c8792ebe24ef2e13dad8f9952f525f69 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Thu, 10 Apr 2025 10:48:41 +0200 Subject: [PATCH] chore: add ws resubscription for hardhat workaround (#112) * Move logFilters to JsonRpcSubscriptions * Add resubscribe flag * Add documentation for the resubscribe symbol * Rename the symbol for better clarity * Provide better message * Add nimbledeps to git ignore * Update wording * Update wording * Remove the ws_resubscribe flag from the config * Handle the concurrency issues when updating the logFilters and add tests * Update log filters comment * Add lock when subscribing to blocks * Remove useless private access * Fix wording * Fix try except format * Restore privateAccess because logEvents moved to JsonRpcSubscriptions * Use seconds instead of milliseconds * Remove extra dot in test label * Restore new lines * Pass the resubscribe internal in new function and remove unneeded try except * Remove ws_resubscribe default value making testing easier * Remove unneeded condition * Add new line * Fix nim syntax * Update symbol description * Log warning when the resubscription interval is more than 300 seconds * Catch errors in close method * Redefine raises for async pragma in close methods * Provide better error message --- .gitignore | 1 + Readme.md | 7 ++ ethers/providers/jsonrpc/subscriptions.nim | 100 ++++++++++++++---- .../jsonrpc/testJsonRpcSubscriptions.nim | 1 + .../jsonrpc/testWsResubscription.nim | 57 ++++++++++ testmodule/providers/testJsonRpc.nim | 1 + 6 files changed, 148 insertions(+), 19 deletions(-) create mode 100644 testmodule/providers/jsonrpc/testWsResubscription.nim diff --git a/.gitignore b/.gitignore index 332b1f5..ad73512 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ nimble.paths .idea .nimble .envrc +nimbledeps diff --git a/Readme.md b/Readme.md index 9952cee..809111e 100644 --- a/Readme.md +++ b/Readme.md @@ -204,6 +204,13 @@ This library ships with some optional modules that provides convenience utilitie - `ethers/erc20` module provides you with ERC20 token implementation and its events +Hardhat websockets workaround +--------- + +If you're working with Hardhat, you might encounter an issue where [websocket subscriptions stop working after 5 minutes](https://github.com/NomicFoundation/hardhat/issues/2053). + +This library provides a workaround using the compile time `ws_resubscribe` symbol. When this symbol is defined and set to a value greater than 0, websocket subscriptions will automatically resubscribe after the amount of time (in seconds) specified. The recommended value is 240 seconds (4 minutes), eg `--define:ws_resubscribe=240`. + Contribution ------------ diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index f628330..95c094e 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -19,6 +19,12 @@ type client: RpcClient callbacks: Table[JsonNode, SubscriptionCallback] methodHandlers: Table[string, MethodHandler] + # Used by both PollingSubscriptions and WebsocketSubscriptions to store + # subscription filters so the subscriptions can be recreated. With + # PollingSubscriptions, the RPC node might prune/forget about them, and with + # WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5 + # minutes. + logFilters: Table[JsonNode, EventFilter] MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].} SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].} @@ -53,7 +59,6 @@ func start*(subscriptions: JsonRpcSubscriptions) = # true = continue processing message using json_rpc's default message handler return ok true - proc setMethodHandler( subscriptions: JsonRpcSubscriptions, `method`: string, @@ -79,10 +84,13 @@ method unsubscribe*(subscriptions: JsonRpcSubscriptions, {.async: (raises: [CancelledError]), base.} = raiseAssert "not implemented " -method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: [SubscriptionError, CancelledError]), base.} = +method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: []), base.} = let ids = toSeq subscriptions.callbacks.keys for id in ids: - await subscriptions.unsubscribe(id) + try: + await subscriptions.unsubscribe(id) + except CatchableError as e: + error "JsonRpc unsubscription failed", error = e.msg, id = id proc getCallback(subscriptions: JsonRpcSubscriptions, id: JsonNode): ?SubscriptionCallback {. raises:[].} = @@ -93,18 +101,68 @@ proc getCallback(subscriptions: JsonRpcSubscriptions, # Web sockets +# Default re-subscription period is seconds +const WsResubscribe {.intdefine.}: int = 0 + type WebSocketSubscriptions = ref object of JsonRpcSubscriptions + logFiltersLock: AsyncLock + resubscribeFut: Future[void] + resubscribeInterval: int + +template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) = + if subscriptions.logFiltersLock.isNil: + subscriptions.logFiltersLock = newAsyncLock() + + await subscriptions.logFiltersLock.acquire() + try: + body + finally: + subscriptions.logFiltersLock.release() + +# This is a workaround to manage the 5 minutes limit due to hardhat. +# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064 +proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebsocketSubscriptions) {.async: (raises: [CancelledError]).} = + while true: + await sleepAsync(subscriptions.resubscribeInterval.seconds) + try: + withLock(subscriptions): + for id, callback in subscriptions.callbacks: + + var newId: JsonNode + if id in subscriptions.logFilters: + let filter = subscriptions.logFilters[id] + newId = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.logFilters[newId] = filter + subscriptions.logFilters.del(id) + else: + newId = await subscriptions.client.eth_subscribe("newHeads") + + subscriptions.callbacks[newId] = callback + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_unsubscribe(id) + except CancelledError as e: + raise e + except CatchableError as e: + error "WS resubscription failed" , error = e.msg proc new*(_: type JsonRpcSubscriptions, - client: RpcWebSocketClient): JsonRpcSubscriptions = + client: RpcWebSocketClient, + resubscribeInterval = WsResubscribe): JsonRpcSubscriptions = + let subscriptions = WebSocketSubscriptions(client: client, resubscribeInterval: resubscribeInterval) - let subscriptions = WebSocketSubscriptions(client: client) proc subscriptionHandler(arguments: JsonNode) {.raises:[].} = let id = arguments{"subscription"} or newJString("") if callback =? subscriptions.getCallback(id): callback(id, success(arguments)) subscriptions.setMethodHandler("eth_subscription", subscriptionHandler) + + if resubscribeInterval > 0: + if resubscribeInterval >= 300: + warn "Resubscription interval greater than 300 seconds is useless for hardhat workaround", resubscribeInterval = resubscribeInterval + + subscriptions.resubscribeFut = resubscribeWebsocketEventsOnTimeout(subscriptions) + subscriptions method subscribeBlocks(subscriptions: WebSocketSubscriptions, @@ -120,9 +178,10 @@ method subscribeBlocks(subscriptions: WebSocketSubscriptions, onBlock(res) convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_subscribe("newHeads") - subscriptions.callbacks[id] = callback - return id + withLock(subscriptions): + let id = await subscriptions.client.eth_subscribe("newHeads") + subscriptions.callbacks[id] = callback + return id method subscribeLogs(subscriptions: WebSocketSubscriptions, filter: EventFilter, @@ -138,33 +197,36 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions, onLog(res) convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.callbacks[id] = callback - return id + withLock(subscriptions): + let id = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.callbacks[id] = callback + subscriptions.logFilters[id] = filter + return id method unsubscribe*(subscriptions: WebSocketSubscriptions, id: JsonNode) {.async: (raises: [CancelledError]).} = try: - subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_unsubscribe(id) + withLock(subscriptions): + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_unsubscribe(id) except CancelledError as e: raise e except CatchableError: # Ignore if uninstallation of the subscribiton fails. discard +method close*(subscriptions: WebSocketSubscriptions) {.async: (raises: []).} = + await procCall JsonRpcSubscriptions(subscriptions).close() + if not subscriptions.resubscribeFut.isNil: + await subscriptions.resubscribeFut.cancelAndWait() + # Polling type PollingSubscriptions* = ref object of JsonRpcSubscriptions polling: Future[void] - # We need to keep around the filters that are used to create log filters on the RPC node - # as there might be a time when they need to be recreated as RPC node might prune/forget - # about them - logFilters: Table[JsonNode, EventFilter] - # Used when filters are recreated to translate from the id that user # originally got returned to new filter id subscriptionMapping: Table[JsonNode, JsonNode] @@ -244,7 +306,7 @@ proc new*(_: type JsonRpcSubscriptions, asyncSpawn subscriptions.polling subscriptions -method close*(subscriptions: PollingSubscriptions) {.async.} = +method close*(subscriptions: PollingSubscriptions) {.async: (raises: []).} = await subscriptions.polling.cancelAndWait() await procCall JsonRpcSubscriptions(subscriptions).close() diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index cba22a4..2edc95f 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -114,6 +114,7 @@ suite "HTTP polling subscriptions - mock tests": var mockServer: MockRpcHttpServer privateAccess(PollingSubscriptions) + privateAccess(JsonRpcSubscriptions) proc startServer() {.async.} = mockServer = MockRpcHttpServer.new() diff --git a/testmodule/providers/jsonrpc/testWsResubscription.nim b/testmodule/providers/jsonrpc/testWsResubscription.nim new file mode 100644 index 0000000..e8c9f6a --- /dev/null +++ b/testmodule/providers/jsonrpc/testWsResubscription.nim @@ -0,0 +1,57 @@ +import std/os +import std/importutils +import pkg/asynctest +import pkg/json_rpc/rpcclient +import ethers/provider +import ethers/providers/jsonrpc/subscriptions + +import ../../examples +import ./rpc_mock + +suite "Websocket re-subscriptions": + privateAccess(JsonRpcSubscriptions) + + var subscriptions: JsonRpcSubscriptions + var client: RpcWebSocketClient + var resubscribeInterval: int + + setup: + resubscribeInterval = 3 + client = newRpcWebSocketClient() + await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")) + subscriptions = JsonRpcSubscriptions.new(client, resubscribeInterval = resubscribeInterval) + subscriptions.start() + + teardown: + await subscriptions.close() + await client.close() + + test "unsubscribing from a log filter while subscriptions are being resubscribed does not cause a concurrency error": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: ?!Log) = discard + + for i in 1..10: + discard await subscriptions.subscribeLogs(filter, emptyHandler) + + # Wait until the re-subscription starts + await sleepAsync(resubscribeInterval.seconds) + + # Attempt to modify callbacks while its being iterated + discard await subscriptions.subscribeLogs(filter, emptyHandler) + + test "resubscribe events take effect with new subscription IDs in the log filters": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: ?!Log) = discard + let id = await subscriptions.subscribeLogs(filter, emptyHandler) + + check id in subscriptions.logFilters + check subscriptions.logFilters.len == 1 + + # Make sure the subscription is done + await sleepAsync((resubscribeInterval + 1).seconds) + + # The previous subscription should not be in the log filters + check id notin subscriptions.logFilters + + # There is still one subscription which is the new one + check subscriptions.logFilters.len == 1 diff --git a/testmodule/providers/testJsonRpc.nim b/testmodule/providers/testJsonRpc.nim index 8e7de1d..660c921 100644 --- a/testmodule/providers/testJsonRpc.nim +++ b/testmodule/providers/testJsonRpc.nim @@ -1,6 +1,7 @@ import ./jsonrpc/testJsonRpcProvider import ./jsonrpc/testJsonRpcSigner import ./jsonrpc/testJsonRpcSubscriptions +import ./jsonrpc/testWsResubscription import ./jsonrpc/testConversions import ./jsonrpc/testErrors