resubscribe when error in polling

This commit is contained in:
Mark Spanbroek 2024-11-11 13:59:39 +01:00 committed by markspanbroek
parent 5a9895b792
commit c6a59b5187
2 changed files with 68 additions and 36 deletions

View File

@ -152,47 +152,62 @@ proc new*(_: type JsonRpcSubscriptions,
let subscriptions = PollingSubscriptions(client: client) let subscriptions = PollingSubscriptions(client: client)
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} = proc resubscribe(id: JsonNode) {.async: (raises: [CancelledError]).} =
try: try:
let mappedId = subscriptions.subscriptionMapping[originalId] var newId: JsonNode
let changes = await subscriptions.client.eth_getFilterChanges(mappedId) # Log filters are stored in logFilters, block filters are not persisted
if changes.kind == JNull: # there is they do not need any specific data for their recreation.
return newJArray() # We use this to determine if the filter was log or block filter here.
elif changes.kind != JArray: if subscriptions.logFilters.hasKey(id):
raise newException(SubscriptionError, let filter = subscriptions.logFilters[id]
"HTTP polling: unexpected value returned from eth_getFilterChanges." & newId = await subscriptions.client.eth_newFilter(filter)
" Expected: JArray, got: " & $changes.kind)
return changes
except CancelledError as e:
raise e
except CatchableError as e:
if "filter not found" in e.msg:
var newId: JsonNode
# Log filters are stored in logFilters, block filters are not persisted
# there is they do not need any specific data for their recreation.
# We use this to determine if the filter was log or block filter here.
if subscriptions.logFilters.hasKey(originalId):
let filter = subscriptions.logFilters[originalId]
newId = await subscriptions.client.eth_newFilter(filter)
else:
newId = await subscriptions.client.eth_newBlockFilter()
subscriptions.subscriptionMapping[originalId] = newId
return await getChanges(originalId)
else: else:
raise e newId = await subscriptions.client.eth_newBlockFilter()
subscriptions.subscriptionMapping[id] = newId
except CancelledError as error:
raise error
except CatchableError:
# there's nothing further we can do here
discard
proc poll(id: JsonNode) {.async.} = proc getChanges(id: JsonNode): Future[JsonNode] {.async: (raises: [CancelledError]).} =
try:
let mappedId = subscriptions.subscriptionMapping[id]
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
if changes.kind == JArray:
return changes
except KeyError as error:
raiseAssert "subscription mapping invalid: " & error.msg
except JsonRpcError:
await resubscribe(id)
# TODO: we could still miss some events between losing the subscription
# and resubscribing. We should probably adopt a strategy like ethers.js,
# whereby we keep track of the latest block number that we've seen
# filter changes for:
# https://github.com/ethers-io/ethers.js/blob/f97b92bbb1bde22fcc44100af78d7f31602863ab/packages/providers/src.ts/base-provider.ts#L977
except CancelledError as error:
raise error
except CatchableError:
# there's nothing we can do here
discard
return newJArray()
proc poll(id: JsonNode) {.async: (raises: [CancelledError]).} =
for change in await getChanges(id): for change in await getChanges(id):
if callback =? subscriptions.getCallback(id): if callback =? subscriptions.getCallback(id):
callback(id, change) callback(id, change)
proc poll {.async.} = proc poll {.async: (raises: []).} =
untilCancelled: try:
for id in toSeq subscriptions.callbacks.keys: while true:
await poll(id) for id in toSeq subscriptions.callbacks.keys:
await sleepAsync(pollingInterval) await poll(id)
await sleepAsync(pollingInterval)
except CancelledError:
discard
subscriptions.polling = poll() subscriptions.polling = poll()
asyncSpawn subscriptions.polling
subscriptions subscriptions
method close*(subscriptions: PollingSubscriptions) {.async.} = method close*(subscriptions: PollingSubscriptions) {.async.} =

View File

@ -1,4 +1,3 @@
import pkg/serde
import std/os import std/os
import std/sequtils import std/sequtils
import std/importutils import std/importutils
@ -106,13 +105,18 @@ suite "HTTP polling subscriptions - filter not found":
privateAccess(PollingSubscriptions) privateAccess(PollingSubscriptions)
setup: proc startServer() {.async.} =
mockServer = MockRpcHttpServer.new() mockServer = MockRpcHttpServer.new()
mockServer.start() mockServer.start()
client = newRpcHttpClient()
await client.connect("http://" & $mockServer.localAddress()[0]) await client.connect("http://" & $mockServer.localAddress()[0])
proc stopServer() {.async.} =
await mockServer.stop()
setup:
client = newRpcHttpClient()
await startServer()
subscriptions = PollingSubscriptions( subscriptions = PollingSubscriptions(
JsonRpcSubscriptions.new( JsonRpcSubscriptions.new(
client, client,
@ -174,3 +178,16 @@ suite "HTTP polling subscriptions - filter not found":
await subscriptions.unsubscribe(id) await subscriptions.unsubscribe(id)
check not subscriptions.subscriptionMapping.hasKey id check not subscriptions.subscriptionMapping.hasKey id
test "polling continues with new filter after temporary error":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
await stopServer()
mockServer.invalidateFilter(id)
await sleepAsync(50.milliseconds)
await startServer()
check eventually subscriptions.subscriptionMapping[id] != id