Handle the concurrency issues when updating the logFilters and add tests

This commit is contained in:
Arnaud 2025-03-27 09:00:12 +01:00
parent f06c3a785e
commit 266ef540e8
No known key found for this signature in database
GPG Key ID: 69D6CE281FCAE663
5 changed files with 122 additions and 36 deletions

View File

@ -209,7 +209,7 @@ Hardhat websockets workaround
If you're working with Hardhat, you might encounter an issue where [websocket subscriptions stop working after 5 minutes](https://github.com/NomicFoundation/hardhat/issues/2053).
This library provides a workaround using the `--define:ws_resubscribe` option. When this symbol is defined, the subscriptions will automatically resubscribe after 4 minutes.
This library provides a workaround using the `--define:ws_resubscribe` option. When this symbol is defined, the subscriptions will automatically resubscribe after 240 seconds (4 minutes) by default. You can change this value using `--define:ws_resubscribe=180`.
Contribution
------------

View File

@ -18,3 +18,4 @@ task test, "Run the test suite":
# exec "nimble install -d -y"
withDir "testmodule":
exec "nimble test"
exec "nimble testWsResubscription"

View File

@ -14,16 +14,18 @@ import ./conversions
export serde
# Default re-subscription period is 240 seconds (4 minutes)
const WsResubscribe {.intdefine.}: int64 = 240
type
JsonRpcSubscriptions* = ref object of RootObj
client: RpcClient
callbacks: Table[JsonNode, SubscriptionCallback]
methodHandlers: Table[string, MethodHandler]
# Used by both PollingSubscriptions and WebsocketSubscriptions to store
# subscription filters so the subscriptions can be recreated. With
# PollingSubscriptions, the RPC node might prune/forget about them, and with
# WebsocketSubscriptions, when using hardhat, subscriptions are dropped after 5
# minutes.
# We need to keep around the filters that are used to create log filters on the RPC node
# as there might be a time when they need to be recreated as RPC node might prune/forget
# about them
# This is used of resubscribe all the subscriptions when using websocket with hardhat
logFilters: Table[JsonNode, EventFilter]
when defined(ws_resubscribe):
resubscribeFut: Future[void]
@ -32,26 +34,6 @@ type
{.push raises:[].}
when defined(ws_resubscribe):
# This is a workaround to manage the 5 minutes limit due to hardhat.
# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064
proc resubscribeWebsocketEventsOnTimeout*(subscriptions: JsonRpcSubscriptions) {.async.} =
while true:
await sleepAsync(4.int64.minutes)
for id, callback in subscriptions.callbacks:
var newId: JsonNode
if id in subscriptions.logFilters:
let filter = subscriptions.logFilters[id]
newId = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.logFilters[newId] = filter
subscriptions.logFilters.del(id)
else:
newId = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[newId] = callback
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
template convertErrorsToSubscriptionError(body) =
try:
body
@ -122,6 +104,7 @@ proc getCallback(subscriptions: JsonRpcSubscriptions,
type
WebSocketSubscriptions = ref object of JsonRpcSubscriptions
logFiltersLock: AsyncLock
proc new*(_: type JsonRpcSubscriptions,
client: RpcWebSocketClient): JsonRpcSubscriptions =
@ -138,6 +121,16 @@ proc new*(_: type JsonRpcSubscriptions,
subscriptions
template withLock*(subscriptions: WebSocketSubscriptions, body: untyped) =
if subscriptions.logFiltersLock.isNil:
subscriptions.logFiltersLock = newAsyncLock()
await subscriptions.logFiltersLock.acquire()
try:
body
finally:
subscriptions.logFiltersLock.release()
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
@ -168,18 +161,24 @@ method subscribeLogs(subscriptions: WebSocketSubscriptions,
let res = Log.fromJson(arguments{"result"}).mapFailure(SubscriptionError)
onLog(res)
convertErrorsToSubscriptionError:
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
subscriptions.logFilters[id] = filter
return id
try:
withLock(subscriptions):
convertErrorsToSubscriptionError:
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
subscriptions.logFilters[id] = filter
return id
except AsyncLockError as e:
error "Lock error when trying to subscribe to logs", err = e.msg
raise newException(SubscriptionError, "Cannot subscribe to the logs because of lock error")
method unsubscribe*(subscriptions: WebSocketSubscriptions,
id: JsonNode)
{.async: (raises: [CancelledError]).} =
try:
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
withLock(subscriptions):
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
except CancelledError as e:
raise e
except CatchableError:
@ -188,9 +187,37 @@ method unsubscribe*(subscriptions: WebSocketSubscriptions,
method close*(subscriptions: WebsocketSubscriptions) {.async.} =
await procCall JsonRpcSubscriptions(subscriptions).close()
if not subscriptions.resubscribeFut.isNil:
await subscriptions.resubscribeFut.cancelAndWait()
when defined(ws_resubscribe):
if not subscriptions.resubscribeFut.isNil:
await subscriptions.resubscribeFut.cancelAndWait()
when defined(ws_resubscribe):
# This is a workaround to manage the 5 minutes limit due to hardhat.
# See https://github.com/NomicFoundation/hardhat/issues/2053#issuecomment-1061374064
proc resubscribeWebsocketEventsOnTimeout*(subscriptions: WebsocketSubscriptions) {.async: (raises: [CancelledError]).} =
while true:
await sleepAsync(WsResubscribe.seconds)
try:
withLock(subscriptions):
for id, callback in subscriptions.callbacks:
var newId: JsonNode
if id in subscriptions.logFilters:
let filter = subscriptions.logFilters[id]
newId = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.logFilters[newId] = filter
subscriptions.logFilters.del(id)
else:
newId = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[newId] = callback
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
except CancelledError as e:
raise e
except CatchableError as e:
error "WS resubscription failed" , error = e.msg
# Polling
type

View File

@ -0,0 +1,54 @@
import std/os
import std/importutils
import pkg/asynctest
import pkg/json_rpc/rpcclient
import ethers/provider
import ethers/providers/jsonrpc/subscriptions
import ../../examples
import ./rpc_mock
suite "Web socket re-subscriptions":
privateAccess(JsonRpcSubscriptions)
var subscriptions: JsonRpcSubscriptions
var client: RpcWebSocketClient
setup:
client = newRpcWebSocketClient()
await client.connect("ws://" & getEnv("ETHERS_TEST_PROVIDER", "localhost:8545"))
subscriptions = JsonRpcSubscriptions.new(client)
subscriptions.start()
teardown:
await subscriptions.close()
await client.close()
test "unsubscribing from a log filter while subscriptions are being resubscribed does not cause a concurrency error.":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
let subscription = await subscriptions.subscribeLogs(filter, emptyHandler)
await sleepAsync(3000.int64.milliseconds)
try:
await subscriptions.unsubscribe(subscription)
except CatchableError:
fail()
test "resubscribe events take effect with new subscription IDs in the log filters":
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
let emptyHandler = proc(log: ?!Log) = discard
let id = await subscriptions.subscribeLogs(filter, emptyHandler)
check id in subscriptions.logFilters
check subscriptions.logFilters.len == 1
await sleepAsync(4.int64.seconds)
# The previous subscription should not be in the log filters
check not (id in subscriptions.logFilters)
# There is still one subscription which is the new one
check subscriptions.logFilters.len == 1

View File

@ -8,3 +8,7 @@ requires "asynctest >= 0.4.0 & < 0.5.0"
task test, "Run the test suite":
exec "nimble install -d -y"
exec "nim c -r test"
task testWsResubscription, "Run the test suite":
exec "nimble install -d -y"
exec "nim c --define:ws_resubscribe=3 -r providers/jsonrpc/testWsResubscription"