diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index 2bba4d2..2f74013 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -15,7 +15,7 @@ import ./conversions export serde # Default re-subscription period is 240 seconds (4 minutes) -const WsResubscribe {.intdefine.}: int64 = 240 +const WsResubscribe {.intdefine.}: int = 240 type JsonRpcSubscriptions* = ref object of RootObj @@ -28,8 +28,6 @@ type # WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5 # minutes. logFilters: Table[JsonNode, EventFilter] - when defined(ws_resubscribe): - resubscribeFut: Future[void] MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].} SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].} @@ -106,11 +104,18 @@ proc getCallback(subscriptions: JsonRpcSubscriptions, type WebSocketSubscriptions = ref object of JsonRpcSubscriptions logFiltersLock: AsyncLock + when defined(ws_resubscribe): + resubscribeFut: Future[void] + resubscribeInterval: int proc new*(_: type JsonRpcSubscriptions, - client: RpcWebSocketClient): JsonRpcSubscriptions = + client: RpcWebSocketClient, + resubscribeInterval = WsResubscribe): JsonRpcSubscriptions = + when defined(ws_resubscribe): + let subscriptions = WebSocketSubscriptions(client: client, resubscribeInterval: resubscribeInterval) + else: + let subscriptions = WebSocketSubscriptions(client: client) - let subscriptions = WebSocketSubscriptions(client: client) proc subscriptionHandler(arguments: JsonNode) {.raises:[].} = let id = arguments{"subscription"} or newJString("") if callback =? subscriptions.getCallback(id): @@ -144,15 +149,11 @@ method subscribeBlocks(subscriptions: WebSocketSubscriptions, let res = Block.fromJson(arguments{"result"}).mapFailure(SubscriptionError) onBlock(res) - try: + convertErrorsToSubscriptionError: withLock(subscriptions): - convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_subscribe("newHeads") - subscriptions.callbacks[id] = callback - return id - except AsyncLockError as e: - error "Lock error when trying to subscribe to blocks", err = e.msg - raise newException(SubscriptionError, "Cannot subscribe to the blocks because of lock error") + let id = await subscriptions.client.eth_subscribe("newHeads") + subscriptions.callbacks[id] = callback + return id method subscribeLogs(subscriptions: WebSocketSubscriptions, filter: EventFilter, @@ -167,16 +168,12 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions, let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError) onLog(res) - try: + convertErrorsToSubscriptionError: withLock(subscriptions): - convertErrorsToSubscriptionError: - let id = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.callbacks[id] = callback - subscriptions.logFilters[id] = filter - return id - except AsyncLockError as e: - error "Lock error when trying to subscribe to logs", err = e.msg - raise newException(SubscriptionError, "Cannot subscribe to the logs because of lock error") + let id = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.callbacks[id] = callback + subscriptions.logFilters[id] = filter + return id method unsubscribe*(subscriptions: WebSocketSubscriptions, id: JsonNode) @@ -191,7 +188,7 @@ method unsubscribe*(subscriptions: WebSocketSubscriptions, # Ignore if uninstallation of the subscribiton fails. discard -method close*(subscriptions: WebsocketSubscriptions) {.async.} = +method close*(subscriptions: WebSocketSubscriptions) {.async: (raises: [CancelledError, SubscriptionError]).} = await procCall JsonRpcSubscriptions(subscriptions).close() when defined(ws_resubscribe): if not subscriptions.resubscribeFut.isNil: @@ -201,28 +198,31 @@ when defined(ws_resubscribe): # This is a workaround to manage the 5 minutes limit due to hardhat. # See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064 proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebsocketSubscriptions) {.async: (raises: [CancelledError]).} = - while true: - await sleepAsync(WsResubscribe.seconds) - try: - withLock(subscriptions): - for id, callback in subscriptions.callbacks: + if subscriptions.resubscribeInterval <= 0: + info "Skipping the resubscription because the interval is zero or negative", period = subscriptions.resubscribeInterval + else: + while true: + await sleepAsync(subscriptions.resubscribeInterval.seconds) + try: + withLock(subscriptions): + for id, callback in subscriptions.callbacks: - var newId: JsonNode - if id in subscriptions.logFilters: - let filter = subscriptions.logFilters[id] - newId = await subscriptions.client.eth_subscribe("logs", filter) - subscriptions.logFilters[newId] = filter - subscriptions.logFilters.del(id) - else: - newId = await subscriptions.client.eth_subscribe("newHeads") + var newId: JsonNode + if id in subscriptions.logFilters: + let filter = subscriptions.logFilters[id] + newId = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.logFilters[newId] = filter + subscriptions.logFilters.del(id) + else: + newId = await subscriptions.client.eth_subscribe("newHeads") - subscriptions.callbacks[newId] = callback - subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_unsubscribe(id) - except CancelledError as e: - raise e - except CatchableError as e: - error "WS resubscription failed" , error = e.msg + subscriptions.callbacks[newId] = callback + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_unsubscribe(id) + except CancelledError as e: + raise e + except CatchableError as e: + error "WS resubscription failed" , error = e.msg # Polling diff --git a/testmodule/providers/jsonrpc/testWsResubscription.nim b/testmodule/providers/jsonrpc/testWsResubscription.nim index b235887..545a6cf 100644 --- a/testmodule/providers/jsonrpc/testWsResubscription.nim +++ b/testmodule/providers/jsonrpc/testWsResubscription.nim @@ -13,11 +13,13 @@ suite "Websocket re-subscriptions": var subscriptions: JsonRpcSubscriptions var client: RpcWebSocketClient + var resubscribeInterval: int setup: + resubscribeInterval = 3 client = newRpcWebSocketClient() await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545")) - subscriptions = JsonRpcSubscriptions.new(client) + subscriptions = JsonRpcSubscriptions.new(client, resubscribeInterval = resubscribeInterval) subscriptions.start() teardown: @@ -28,15 +30,14 @@ suite "Websocket re-subscriptions": let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) let emptyHandler = proc(log: ?!Log) = discard - let subscription = await subscriptions.subscribeLogs(filter, emptyHandler) + for i in 1..10: + discard await subscriptions.subscribeLogs(filter, emptyHandler) # Wait until the re-subscription starts - await sleepAsync(3.int64.seconds) + await sleepAsync(resubscribeInterval.seconds) - try: - await subscriptions.unsubscribe(subscription) - except CatchableError: - fail() + # Attempt to modify callbacks while its being iterated + discard await subscriptions.subscribeLogs(filter, emptyHandler) test "resubscribe events take effect with new subscription IDs in the log filters": let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) @@ -46,7 +47,8 @@ suite "Websocket re-subscriptions": check id in subscriptions.logFilters check subscriptions.logFilters.len == 1 - await sleepAsync(4.int64.seconds) + # Make sure the subscription is done + await sleepAsync((resubscribeInterval + 1).seconds) # The previous subscription should not be in the log filters check not (id in subscriptions.logFilters) diff --git a/testmodule/providers/testJsonRpc.nim b/testmodule/providers/testJsonRpc.nim index 8e7de1d..9cac33c 100644 --- a/testmodule/providers/testJsonRpc.nim +++ b/testmodule/providers/testJsonRpc.nim @@ -1,6 +1,8 @@ import ./jsonrpc/testJsonRpcProvider import ./jsonrpc/testJsonRpcSigner import ./jsonrpc/testJsonRpcSubscriptions +when defined(ws_resubscribe): + import ./jsonrpc/testWsResubscription import ./jsonrpc/testConversions import ./jsonrpc/testErrors diff --git a/testmodule/test.nimble b/testmodule/test.nimble index 72db518..a3e9037 100644 --- a/testmodule/test.nimble +++ b/testmodule/test.nimble @@ -7,8 +7,4 @@ requires "asynctest >= 0.4.0 & < 0.5.0" task test, "Run the test suite": exec "nimble install -d -y" - exec "nim c -r test" - -task testWsResubscription, "Run the test suite": - exec "nimble install -d -y" - exec "nim c --define:ws_resubscribe=3 -r providers/jsonrpc/testWsResubscription" + exec "nim c --define:ws_resubscribe=0 -r test" \ No newline at end of file