From 507ac6a4cc71cec9be7693fa393db4a49b52baf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Tue, 22 Oct 2024 15:57:25 +0200 Subject: [PATCH] fix(subscriptions): filter not found recreates polling filter (#78) --- ethers.nimble | 2 +- ethers/providers/jsonrpc/subscriptions.nim | 29 +++++++-- testmodule/providers/jsonrpc/rpc_mock.nim | 50 ++++++++++++++ .../jsonrpc/testJsonRpcSubscriptions.nim | 65 +++++++++++++++++++ 4 files changed, 141 insertions(+), 5 deletions(-) create mode 100644 testmodule/providers/jsonrpc/rpc_mock.nim diff --git a/ethers.nimble b/ethers.nimble index 8e2be56..0168c2d 100644 --- a/ethers.nimble +++ b/ethers.nimble @@ -15,6 +15,6 @@ requires "stew" requires "eth#c482b4c5b658a77cc96b49d4a397aa6d98472ac7" task test, "Run the test suite": - exec "nimble install -d -y" + # exec "nimble install -d -y" withDir "testmodule": exec "nimble test" diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index a3f0810..e2bc6cb 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -1,5 +1,6 @@ import std/tables import std/sequtils +import std/strutils import pkg/chronos import pkg/json_rpc/rpcclient import ../../basics @@ -132,16 +133,31 @@ type PollingSubscriptions = ref object of JsonRpcSubscriptions polling: Future[void] + # We need to keep around the filters that are used to create log filters on the RPC node + # as there might be a time when they need to be recreated as RPC node might prune/forget + # about them + filters: Table[JsonNode, EventFilter] + + # Used when filters are recreated to translate from the id that user + # originally got returned to new filter id + subscriptionMapping: Table[JsonNode, JsonNode] + proc new*(_: type JsonRpcSubscriptions, client: RpcHttpClient, pollingInterval = 4.seconds): JsonRpcSubscriptions = let subscriptions = PollingSubscriptions(client: client) - proc getChanges(id: JsonNode): Future[JsonNode] {.async.} = + proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} = try: - return await subscriptions.client.eth_getFilterChanges(id) - except CatchableError: + let mappedId = subscriptions.subscriptionMapping[originalId] + return await subscriptions.client.eth_getFilterChanges(mappedId) + except CatchableError as e: + if "filter not found" in e.msg: + let filter = subscriptions.filters[originalId] + let newId = await subscriptions.client.eth_newFilter(filter) + subscriptions.subscriptionMapping[originalId] = newId + return newJArray() proc poll(id: JsonNode) {.async.} = @@ -180,6 +196,7 @@ method subscribeBlocks(subscriptions: PollingSubscriptions, let id = await subscriptions.client.eth_newBlockFilter() subscriptions.callbacks[id] = callback + subscriptions.subscriptionMapping[id] = id return id method subscribeLogs(subscriptions: PollingSubscriptions, @@ -194,10 +211,14 @@ method subscribeLogs(subscriptions: PollingSubscriptions, let id = await subscriptions.client.eth_newFilter(filter) subscriptions.callbacks[id] = callback + subscriptions.filters[id] = filter + subscriptions.subscriptionMapping[id] = id return id method unsubscribe*(subscriptions: PollingSubscriptions, id: JsonNode) {.async.} = + discard await subscriptions.client.eth_uninstallFilter(subscriptions.subscriptionMapping[id]) + subscriptions.filters.del(id) subscriptions.callbacks.del(id) - discard await subscriptions.client.eth_uninstallFilter(id) + subscriptions.subscriptionMapping.del(id) diff --git a/testmodule/providers/jsonrpc/rpc_mock.nim b/testmodule/providers/jsonrpc/rpc_mock.nim new file mode 100644 index 0000000..d9cae74 --- /dev/null +++ b/testmodule/providers/jsonrpc/rpc_mock.nim @@ -0,0 +1,50 @@ +import ../../examples +import ../../../ethers/provider +import ../../../ethers/providers/jsonrpc/conversions + +import std/tables +import pkg/stew/byteutils +import pkg/json_rpc/rpcserver except `%`, `%*` +import pkg/json_rpc/errors + + +type MockRpcHttpServer* = ref object + filters*: Table[string, bool] + newFilterCounter*: int + srv: RpcHttpServer + +proc new*(_: type MockRpcHttpServer): MockRpcHttpServer = + MockRpcHttpServer(filters: initTable[string, bool](), newFilterCounter: 0, srv: newRpcHttpServer(["127.0.0.1:0"])) + +proc invalidateFilter*(server: MockRpcHttpServer, id: string) = + server.filters[id] = false + +proc start*(server: MockRpcHttpServer) = + server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string: + let filterId = "0x" & (array[16, byte].example).toHex + server.filters[filterId] = true + server.newFilterCounter += 1 + return filterId + + server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]: + if(not hasKey(server.filters, id) or not server.filters[id]): + raise (ref ApplicationError)(code: -32000, msg: "filter not found") + + return @[] + + server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool: + if(not hasKey(server.filters, id)): + raise (ref ApplicationError)(code: -32000, msg: "filter not found") + + server.filters.del(id) + return true + + server.srv.start() + +proc stop*(server: MockRpcHttpServer) {.async.} = + await server.srv.stop() + await server.srv.closeWait() + + +proc localAddress*(server: MockRpcHttpServer): seq[TransportAddress] = + return server.srv.localAddress() diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index a402774..d26f8d8 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -1,9 +1,15 @@ import std/json +import std/sequtils import pkg/asynctest +import pkg/serde import pkg/json_rpc/rpcclient +import pkg/json_rpc/rpcserver import ethers/provider import ethers/providers/jsonrpc/subscriptions +import ../../examples +import ./rpc_mock + suite "JsonRpcSubscriptions": test "can be instantiated with an http client": @@ -89,3 +95,62 @@ suite "HTTP polling subscriptions": await client.close() subscriptionTests(subscriptions, client) + +suite "HTTP polling subscriptions - filter not found": + + var subscriptions: JsonRpcSubscriptions + var client: RpcHttpClient + var mockServer: MockRpcHttpServer + + setup: + mockServer = MockRpcHttpServer.new() + mockServer.start() + + client = newRpcHttpClient() + await client.connect("http://" & $mockServer.localAddress()[0]) + + subscriptions = JsonRpcSubscriptions.new(client, + pollingInterval = 15.millis) + subscriptions.start() + + teardown: + await subscriptions.close() + await client.close() + await mockServer.stop() + + test "filter not found error recreates filter": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: Log) = discard + + check mockServer.newFilterCounter == 0 + let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler) + let id = string.fromJson(jsonId).tryGet + check mockServer.newFilterCounter == 1 + + await sleepAsync(50.millis) + mockServer.invalidateFilter(id) + await sleepAsync(50.millis) + check mockServer.newFilterCounter == 2 + + test "recreated filter can be still unsubscribed using the original id": + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: Log) = discard + + check mockServer.newFilterCounter == 0 + let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler) + let id = string.fromJson(jsonId).tryGet + check mockServer.newFilterCounter == 1 + + await sleepAsync(50.millis) + mockServer.invalidateFilter(id) + check eventually mockServer.newFilterCounter == 2 + check mockServer.filters[id] == false + check mockServer.filters.len() == 2 + await subscriptions.unsubscribe(jsonId) + check mockServer.filters.len() == 1 + + # invalidateFilter sets the filter's value to false which will return the "filter not found" + # unsubscribing will actually delete the key from filters table + # hence after unsubscribing the only key left in the table should be the original id + for key in mockServer.filters.keys(): + check key == id