diff --git a/testmodule/providers/jsonrpc/rpc_mock.nim b/testmodule/providers/jsonrpc/rpc_mock.nim index b4411e7..49128e5 100644 --- a/testmodule/providers/jsonrpc/rpc_mock.nim +++ b/testmodule/providers/jsonrpc/rpc_mock.nim @@ -3,41 +3,45 @@ import ../../../ethers/provider import ../../../ethers/providers/jsonrpc/conversions import std/tables -import std/sequtils import pkg/stew/byteutils import pkg/json_rpc/rpcserver except `%`, `%*` import pkg/json_rpc/errors type MockRpcHttpServer* = ref object - filters*: seq[string] + filterChanges*: Table[string, seq[Log]] srv: RpcHttpServer proc new*(_: type MockRpcHttpServer): MockRpcHttpServer = let srv = newRpcHttpServer(["127.0.0.1:0"]) - MockRpcHttpServer(filters: @[], srv: srv) + MockRpcHttpServer(filterChanges: initTable[string, seq[Log]](), srv: srv) proc invalidateFilter*(server: MockRpcHttpServer, jsonId: JsonNode) = - server.filters.keepItIf it != jsonId.getStr + trace "invalidating filter", id = jsonId.getStr + server.filterChanges.del jsonId.getStr + +proc addFilterChange*(server: MockRpcHttpServer, jsonId: JsonNode, change: Log) = + server.filterChanges[jsonId.getStr].add change proc start*(server: MockRpcHttpServer) = server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string: let filterId = "0x" & (array[16, byte].example).toHex - server.filters.add filterId + server.filterChanges[filterId] = @[] return filterId server.srv.router.rpc("eth_newBlockFilter") do() -> string: let filterId = "0x" & (array[16, byte].example).toHex - server.filters.add filterId + server.filterChanges[filterId] = @[] return filterId - server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]: - if id notin server.filters: + server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[Log]: + if id notin server.filterChanges: + trace "Raising filter not found error", id raise (ref ApplicationError)(code: -32000, msg: "filter not found") - return @[] + return server.filterChanges[id] server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool: - if id notin server.filters: + if id notin server.filterChanges: raise (ref ApplicationError)(code: -32000, msg: "filter not found") server.invalidateFilter(%id) diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index 5f96ce3..ad60084 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -2,10 +2,9 @@ import pkg/serde import std/os import std/sequtils import std/importutils -import pkg/asynctest -import pkg/serde -import pkg/json_rpc/rpcclient -import pkg/json_rpc/rpcserver +import pkg/asynctest/chronos/unittest +import pkg/json_rpc/rpcclient except `%`, `%*`, toJson +import pkg/json_rpc/rpcserver except `%`, `%*`, toJson import ethers/provider import ethers/providers/jsonrpc/subscriptions @@ -103,6 +102,7 @@ suite "HTTP polling subscriptions - filter not found": var subscriptions: PollingSubscriptions var client: RpcHttpClient var mockServer: MockRpcHttpServer + var url: string privateAccess(PollingSubscriptions) @@ -111,7 +111,8 @@ suite "HTTP polling subscriptions - filter not found": mockServer.start() client = newRpcHttpClient() - await client.connect("http://" & $mockServer.localAddress()[0]) + url = "http://" & $mockServer.localAddress()[0] + await client.connect(url) subscriptions = PollingSubscriptions( JsonRpcSubscriptions.new( @@ -124,6 +125,30 @@ suite "HTTP polling subscriptions - filter not found": await client.close() await mockServer.stop() + test "polling loop is kept alive after disconnection": + let log = Log(blockNumber: 999999.u256) + + var latestBlock = 0.u256 + proc callback(log: Log) = + trace "GOT LOG IN CALLBACK", number = log.blockNumber + latestBlock = log.blockNumber + + let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) + let emptyHandler = proc(log: Log) = discard + + let id = await subscriptions.subscribeLogs(filter, callback) + # simulate failed requests by connecting to an invalid port + await client.connect("http://127.0.0.1:1") + await sleepAsync(5.millis) # wait for polling loop + trace "STARTING MOCK SERVER" + await client.connect(url) + trace "ADDING FILTER CHANGE" + mockServer.addFilterChange(id, log) + + check eventually(latestBlock == log.blockNumber, timeout=200) + + await subscriptions.unsubscribe(id) + test "filter not found error recreates log filter": let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example]) let emptyHandler = proc(log: Log) = discard