From c6a59b5187d81616203dc726d9e6f251f350f395 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Mon, 11 Nov 2024 13:59:39 +0100 Subject: [PATCH] resubscribe when error in polling --- ethers/providers/jsonrpc/subscriptions.nim | 79 +++++++++++-------- .../jsonrpc/testJsonRpcSubscriptions.nim | 25 +++++- 2 files changed, 68 insertions(+), 36 deletions(-) diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index fc9dea9..17f59d6 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -152,47 +152,62 @@ proc new*(_: type JsonRpcSubscriptions, let subscriptions = PollingSubscriptions(client: client) - proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} = + proc resubscribe(id: JsonNode) {.async: (raises: [CancelledError]).} = try: - let mappedId = subscriptions.subscriptionMapping[originalId] - let changes = await subscriptions.client.eth_getFilterChanges(mappedId) - if changes.kind == JNull: - return newJArray() - elif changes.kind != JArray: - raise newException(SubscriptionError, - "HTTP polling: unexpected value returned from eth_getFilterChanges." & - " Expected: JArray, got: " & $changes.kind) - return changes - except CancelledError as e: - raise e - except CatchableError as e: - if "filter not found" in e.msg: - var newId: JsonNode - # Log filters are stored in logFilters, block filters are not persisted - # there is they do not need any specific data for their recreation. - # We use this to determine if the filter was log or block filter here. - if subscriptions.logFilters.hasKey(originalId): - let filter = subscriptions.logFilters[originalId] - newId = await subscriptions.client.eth_newFilter(filter) - else: - newId = await subscriptions.client.eth_newBlockFilter() - subscriptions.subscriptionMapping[originalId] = newId - return await getChanges(originalId) + var newId: JsonNode + # Log filters are stored in logFilters, block filters are not persisted + # there is they do not need any specific data for their recreation. + # We use this to determine if the filter was log or block filter here. + if subscriptions.logFilters.hasKey(id): + let filter = subscriptions.logFilters[id] + newId = await subscriptions.client.eth_newFilter(filter) else: - raise e + newId = await subscriptions.client.eth_newBlockFilter() + subscriptions.subscriptionMapping[id] = newId + except CancelledError as error: + raise error + except CatchableError: + # there's nothing further we can do here + discard - proc poll(id: JsonNode) {.async.} = + proc getChanges(id: JsonNode): Future[JsonNode] {.async: (raises: [CancelledError]).} = + try: + let mappedId = subscriptions.subscriptionMapping[id] + let changes = await subscriptions.client.eth_getFilterChanges(mappedId) + if changes.kind == JArray: + return changes + except KeyError as error: + raiseAssert "subscription mapping invalid: " & error.msg + except JsonRpcError: + await resubscribe(id) + # TODO: we could still miss some events between losing the subscription + # and resubscribing. We should probably adopt a strategy like ethers.js, + # whereby we keep track of the latest block number that we've seen + # filter changes for: + # https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977 + except CancelledError as error: + raise error + except CatchableError: + # there's nothing we can do here + discard + return newJArray() + + proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} = for change in await getChanges(id): if callback =? subscriptions.getCallback(id): callback(id, change) - proc poll {.async.} = - untilCancelled: - for id in toSeq subscriptions.callbacks.keys: - await poll(id) - await sleepAsync(pollingInterval) + proc poll {.async: (raises: []).} = + try: + while true: + for id in toSeq subscriptions.callbacks.keys: + await poll(id) + await sleepAsync(pollingInterval) + except CancelledError: + discard subscriptions.polling = poll() + asyncSpawn subscriptions.polling subscriptions method close*(subscriptions: PollingSubscriptions) {.async.} = diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index 5f96ce3..d20a152 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -1,4 +1,3 @@ -import pkg/serde import std/os import std/sequtils import std/importutils @@ -106,13 +105,18 @@ suite "HTTP polling subscriptions - filter not found": privateAccess(PollingSubscriptions) - setup: + proc startServer() {.async.} = mockServer = MockRpcHttpServer.new() mockServer.start() - - client = newRpcHttpClient() await client.connect("http://" & $mockServer.localAddress()[0]) + proc stopServer() {.async.} = + await mockServer.stop() + + setup: + client = newRpcHttpClient() + await startServer() + subscriptions = PollingSubscriptions( JsonRpcSubscriptions.new( client, @@ -174,3 +178,16 @@ suite "HTTP polling subscriptions - filter not found": await subscriptions.unsubscribe(id) check not subscriptions.subscriptionMapping.hasKey id + + test "polling continues with new filter after temporary error": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: Log) = discard + + let id = await subscriptions.subscribeLogs(filter, emptyHandler) + + await stopServer() + mockServer.invalidateFilter(id) + await sleepAsync(50.milliseconds) + await startServer() + + check eventually subscriptions.subscriptionMapping[id] != id