Add resubscribe flag

This commit is contained in:
Arnaud 2025-03-20 15:45:18 +01:00
parent c48ff9b84d
commit 4c19e14853
No known key found for this signature in database
GPG Key ID: 69D6CE281FCAE663

View File

@ -24,11 +24,33 @@ type
# about them
# This is used of resubscribe all the subscriptions when using websocket with hardhat
logFilters: Table[JsonNode, EventFilter]
when defined(resubscribe):
resubscribeFut: Future[void]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id: JsonNode, arguments: ?!JsonNode) {.gcsafe, raises:[].}
{.push raises:[].}
when defined(resubscribe):
# This is a workaround to manage the 5 minutes limit due to hardhat.
# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064
proc resubscribeWebsocketEventsOnTimeout*(subscriptions: JsonRpcSubscriptions) {.async.} =
while true:
await sleepAsync(4.int64.minutes)
for id, callback in subscriptions.callbacks:
var newId: JsonNode
if id in subscriptions.logFilters:
let filter = subscriptions.logFilters[id]
newId = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.logFilters[newId] = filter
subscriptions.logFilters.del(id)
else:
newId = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[newId] = callback
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
template convertErrorsToSubscriptionError(body) =
try:
body
@ -58,7 +80,6 @@ func start*(subscriptions: JsonRpcSubscriptions) =
# true = continue processing message using json_rpc's default message handler
return ok true
proc setMethodHandler(
subscriptions: JsonRpcSubscriptions,
`method`: string,
@ -89,6 +110,10 @@ method close*(subscriptions: JsonRpcSubscriptions) {.async: (raises: [Subscripti
for id in ids:
await subscriptions.unsubscribe(id)
when defined(resubscribe):
if not subscriptions.resubscribeFut.isNil:
await subscriptions.resubscribeFut.cancelAndWait()
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback {. raises:[].} =
try:
@ -110,6 +135,10 @@ proc new*(_: type JsonRpcSubscriptions,
if callback =? subscriptions.getCallback(id):
callback(id, success(arguments))
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
when defined(resubscribe):
subscriptions.resubscribeFut = resubscribeWebsocketEventsOnTimeout(subscriptions)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,