diff --git a/Readme.md b/Readme.md index 8f9553b..2c053ce 100644 --- a/Readme.md +++ b/Readme.md @@ -209,7 +209,7 @@ Hardhat websockets workaround If you're working with Hardhat, you might encounter an issue where [websocket subscriptions stop working after 5 minutes](https://github.com/NomicFoundation/hardhat/issues/2053). -This library provides a workaround using the `--define:ws_resubscribe` option. When this symbol is defined, the subscriptions will automatically resubscribe after 4 minutes. +This library provides a workaround using the `--define:ws_resubscribe` option. When this symbol is defined, the subscriptions will automatically resubscribe after 240 seconds (4 minutes) by default. You can change this value using `--define:ws_resubscribe=180`. Contribution ------------ diff --git a/ethers.nimble b/ethers.nimble index ebd0d94..da84bb5 100644 --- a/ethers.nimble +++ b/ethers.nimble @@ -18,3 +18,4 @@ task test, "Run the test suite": # exec "nimble install -d -y" withDir "testmodule": exec "nimble test" + exec "nimble testWsResubscription" \ No newline at end of file diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index 7ae2f95..f869837 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -14,16 +14,18 @@ import ./conversions export serde +# Default re-subscription period is 240 seconds (4 minutes) +const WsResubscribe {.intdefine.}: int64 = 240 + type JsonRpcSubscriptions* = ref object of RootObj client: RpcClient callbacks: Table[JsonNode, SubscriptionCallback] methodHandlers: Table[string, MethodHandler] - # Used by both PollingSubscriptions and WebsocketSubscriptions to store - # subscription filters so the subscriptions can be recreated. With - # PollingSubscriptions, the RPC node might prune/forget about them, and with - # WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5 - # minutes. + # We need to keep around the filters that are used to create log filters on the RPC node + # as there might be a time when they need to be recreated as RPC node might prune/forget + # about them + # This is used of resubscribe all the subscriptions when using websocket with hardhat logFilters: Table[JsonNode, EventFilter] when defined(ws_resubscribe): resubscribeFut: Future[void] @@ -32,26 +34,6 @@ type {.push raises:[].} -when defined(ws_resubscribe): - # This is a workaround to manage the 5 minutes limit due to hardhat. - # See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064 - proc resubscribeWebsocketEventsOnTimeout*(subscriptions: JsonRpcSubscriptions) {.async.} = - while true: - await sleepAsync(4.int64.minutes) - for id, callback in subscriptions.callbacks: - var newId: JsonNode - if id in subscriptions.logFilters: - let filter = subscriptions.logFilters[id] - newId = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.logFilters[newId] = filter - subscriptions.logFilters.del(id) - else: - newId = await subscriptions.client.eth_subscribe("newHeads") - - subscriptions.callbacks[newId] = callback - subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_unsubscribe(id) - template convertErrorsToSubscriptionError(body) = try: body @@ -122,6 +104,7 @@ proc getCallback(subscriptions: JsonRpcSubscriptions, type WebSocketSubscriptions = ref object of JsonRpcSubscriptions + logFiltersLock: AsyncLock proc new*(_: type JsonRpcSubscriptions, client: RpcWebSocketClient): JsonRpcSubscriptions = @@ -138,6 +121,16 @@ proc new*(_: type JsonRpcSubscriptions, subscriptions +template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) = + if subscriptions.logFiltersLock.isNil: + subscriptions.logFiltersLock = newAsyncLock() + + await subscriptions.logFiltersLock.acquire() + try: + body + finally: + subscriptions.logFiltersLock.release() + method subscribeBlocks(subscriptions: WebSocketSubscriptions, onBlock: BlockHandler): Future[JsonNode] @@ -168,18 +161,24 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions, let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError) onLog(res) - convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.callbacks[id] = callback - subscriptions.logFilters[id] = filter - return id + try: + withLock(subscriptions): + convertErrorsToSubscriptionError: + let id = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.callbacks[id] = callback + subscriptions.logFilters[id] = filter + return id + except AsyncLockError as e: + error "Lock error when trying to subscribe to logs", err = e.msg + raise newException(SubscriptionError, "Cannot subscribe to the logs because of lock error") method unsubscribe*(subscriptions: WebSocketSubscriptions, id: JsonNode) {.async: (raises: [CancelledError]).} = try: - subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_unsubscribe(id) + withLock(subscriptions): + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_unsubscribe(id) except CancelledError as e: raise e except CatchableError: @@ -188,9 +187,37 @@ method unsubscribe*(subscriptions: WebSocketSubscriptions, method close*(subscriptions: WebsocketSubscriptions) {.async.} = await procCall JsonRpcSubscriptions(subscriptions).close() - if not subscriptions.resubscribeFut.isNil: - await subscriptions.resubscribeFut.cancelAndWait() - + when defined(ws_resubscribe): + if not subscriptions.resubscribeFut.isNil: + await subscriptions.resubscribeFut.cancelAndWait() + +when defined(ws_resubscribe): + # This is a workaround to manage the 5 minutes limit due to hardhat. + # See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064 + proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebsocketSubscriptions) {.async: (raises: [CancelledError]).} = + while true: + await sleepAsync(WsResubscribe.seconds) + try: + withLock(subscriptions): + for id, callback in subscriptions.callbacks: + + var newId: JsonNode + if id in subscriptions.logFilters: + let filter = subscriptions.logFilters[id] + newId = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.logFilters[newId] = filter + subscriptions.logFilters.del(id) + else: + newId = await subscriptions.client.eth_subscribe("newHeads") + + subscriptions.callbacks[newId] = callback + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_unsubscribe(id) + except CancelledError as e: + raise e + except CatchableError as e: + error "WS resubscription failed" , error = e.msg + # Polling type diff --git a/testmodule/providers/jsonrpc/testWsResubscription.nim b/testmodule/providers/jsonrpc/testWsResubscription.nim new file mode 100644 index 0000000..381b24e --- /dev/null +++ b/testmodule/providers/jsonrpc/testWsResubscription.nim @@ -0,0 +1,54 @@ +import std/os +import std/importutils +import pkg/asynctest +import pkg/json_rpc/rpcclient +import ethers/provider +import ethers/providers/jsonrpc/subscriptions + +import ../../examples +import ./rpc_mock + +suite "Web socket re-subscriptions": + privateAccess(JsonRpcSubscriptions) + + var subscriptions: JsonRpcSubscriptions + var client: RpcWebSocketClient + + setup: + client = newRpcWebSocketClient() + await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")) + subscriptions = JsonRpcSubscriptions.new(client) + subscriptions.start() + + teardown: + await subscriptions.close() + await client.close() + + test "unsubscribing from a log filter while subscriptions are being resubscribed does not cause a concurrency error.": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: ?!Log) = discard + + let subscription = await subscriptions.subscribeLogs(filter, emptyHandler) + + await sleepAsync(3000.int64.milliseconds) + + try: + await subscriptions.unsubscribe(subscription) + except CatchableError: + fail() + + test "resubscribe events take effect with new subscription IDs in the log filters": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: ?!Log) = discard + let id = await subscriptions.subscribeLogs(filter, emptyHandler) + + check id in subscriptions.logFilters + check subscriptions.logFilters.len == 1 + + await sleepAsync(4.int64.seconds) + + # The previous subscription should not be in the log filters + check not (id in subscriptions.logFilters) + + # There is still one subscription which is the new one + check subscriptions.logFilters.len == 1 diff --git a/testmodule/test.nimble b/testmodule/test.nimble index b0b7f02..72db518 100644 --- a/testmodule/test.nimble +++ b/testmodule/test.nimble @@ -8,3 +8,7 @@ requires "asynctest >= 0.4.0 & < 0.5.0" task test, "Run the test suite": exec "nimble install -d -y" exec "nim c -r test" + +task testWsResubscription, "Run the test suite": + exec "nimble install -d -y" + exec "nim c --define:ws_resubscribe=3 -r providers/jsonrpc/testWsResubscription"