fix(subscriptions): filter not found recreates polling filter (#78)
This commit is contained in:
parent
53e596e75a
commit
507ac6a4cc
|
@ -15,6 +15,6 @@ requires "stew"
|
|||
requires "eth#c482b4c5b658a77cc96b49d4a397aa6d98472ac7"
|
||||
|
||||
task test, "Run the test suite":
|
||||
exec "nimble install -d -y"
|
||||
# exec "nimble install -d -y"
|
||||
withDir "testmodule":
|
||||
exec "nimble test"
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import std/tables
|
||||
import std/sequtils
|
||||
import std/strutils
|
||||
import pkg/chronos
|
||||
import pkg/json_rpc/rpcclient
|
||||
import ../../basics
|
||||
|
@ -132,16 +133,31 @@ type
|
|||
PollingSubscriptions = ref object of JsonRpcSubscriptions
|
||||
polling: Future[void]
|
||||
|
||||
# We need to keep around the filters that are used to create log filters on the RPC node
|
||||
# as there might be a time when they need to be recreated as RPC node might prune/forget
|
||||
# about them
|
||||
filters: Table[JsonNode, EventFilter]
|
||||
|
||||
# Used when filters are recreated to translate from the id that user
|
||||
# originally got returned to new filter id
|
||||
subscriptionMapping: Table[JsonNode, JsonNode]
|
||||
|
||||
proc new*(_: type JsonRpcSubscriptions,
|
||||
client: RpcHttpClient,
|
||||
pollingInterval = 4.seconds): JsonRpcSubscriptions =
|
||||
|
||||
let subscriptions = PollingSubscriptions(client: client)
|
||||
|
||||
proc getChanges(id: JsonNode): Future[JsonNode] {.async.} =
|
||||
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} =
|
||||
try:
|
||||
return await subscriptions.client.eth_getFilterChanges(id)
|
||||
except CatchableError:
|
||||
let mappedId = subscriptions.subscriptionMapping[originalId]
|
||||
return await subscriptions.client.eth_getFilterChanges(mappedId)
|
||||
except CatchableError as e:
|
||||
if "filter not found" in e.msg:
|
||||
let filter = subscriptions.filters[originalId]
|
||||
let newId = await subscriptions.client.eth_newFilter(filter)
|
||||
subscriptions.subscriptionMapping[originalId] = newId
|
||||
|
||||
return newJArray()
|
||||
|
||||
proc poll(id: JsonNode) {.async.} =
|
||||
|
@ -180,6 +196,7 @@ method subscribeBlocks(subscriptions: PollingSubscriptions,
|
|||
|
||||
let id = await subscriptions.client.eth_newBlockFilter()
|
||||
subscriptions.callbacks[id] = callback
|
||||
subscriptions.subscriptionMapping[id] = id
|
||||
return id
|
||||
|
||||
method subscribeLogs(subscriptions: PollingSubscriptions,
|
||||
|
@ -194,10 +211,14 @@ method subscribeLogs(subscriptions: PollingSubscriptions,
|
|||
|
||||
let id = await subscriptions.client.eth_newFilter(filter)
|
||||
subscriptions.callbacks[id] = callback
|
||||
subscriptions.filters[id] = filter
|
||||
subscriptions.subscriptionMapping[id] = id
|
||||
return id
|
||||
|
||||
method unsubscribe*(subscriptions: PollingSubscriptions,
|
||||
id: JsonNode)
|
||||
{.async.} =
|
||||
discard await subscriptions.client.eth_uninstallFilter(subscriptions.subscriptionMapping[id])
|
||||
subscriptions.filters.del(id)
|
||||
subscriptions.callbacks.del(id)
|
||||
discard await subscriptions.client.eth_uninstallFilter(id)
|
||||
subscriptions.subscriptionMapping.del(id)
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
import ../../examples
|
||||
import ../../../ethers/provider
|
||||
import ../../../ethers/providers/jsonrpc/conversions
|
||||
|
||||
import std/tables
|
||||
import pkg/stew/byteutils
|
||||
import pkg/json_rpc/rpcserver except `%`, `%*`
|
||||
import pkg/json_rpc/errors
|
||||
|
||||
|
||||
type MockRpcHttpServer* = ref object
|
||||
filters*: Table[string, bool]
|
||||
newFilterCounter*: int
|
||||
srv: RpcHttpServer
|
||||
|
||||
proc new*(_: type MockRpcHttpServer): MockRpcHttpServer =
|
||||
MockRpcHttpServer(filters: initTable[string, bool](), newFilterCounter: 0, srv: newRpcHttpServer(["127.0.0.1:0"]))
|
||||
|
||||
proc invalidateFilter*(server: MockRpcHttpServer, id: string) =
|
||||
server.filters[id] = false
|
||||
|
||||
proc start*(server: MockRpcHttpServer) =
|
||||
server.srv.router.rpc("eth_newFilter") do(filter: EventFilter) -> string:
|
||||
let filterId = "0x" & (array[16, byte].example).toHex
|
||||
server.filters[filterId] = true
|
||||
server.newFilterCounter += 1
|
||||
return filterId
|
||||
|
||||
server.srv.router.rpc("eth_getFilterChanges") do(id: string) -> seq[string]:
|
||||
if(not hasKey(server.filters, id) or not server.filters[id]):
|
||||
raise (ref ApplicationError)(code: -32000, msg: "filter not found")
|
||||
|
||||
return @[]
|
||||
|
||||
server.srv.router.rpc("eth_uninstallFilter") do(id: string) -> bool:
|
||||
if(not hasKey(server.filters, id)):
|
||||
raise (ref ApplicationError)(code: -32000, msg: "filter not found")
|
||||
|
||||
server.filters.del(id)
|
||||
return true
|
||||
|
||||
server.srv.start()
|
||||
|
||||
proc stop*(server: MockRpcHttpServer) {.async.} =
|
||||
await server.srv.stop()
|
||||
await server.srv.closeWait()
|
||||
|
||||
|
||||
proc localAddress*(server: MockRpcHttpServer): seq[TransportAddress] =
|
||||
return server.srv.localAddress()
|
|
@ -1,9 +1,15 @@
|
|||
import std/json
|
||||
import std/sequtils
|
||||
import pkg/asynctest
|
||||
import pkg/serde
|
||||
import pkg/json_rpc/rpcclient
|
||||
import pkg/json_rpc/rpcserver
|
||||
import ethers/provider
|
||||
import ethers/providers/jsonrpc/subscriptions
|
||||
|
||||
import ../../examples
|
||||
import ./rpc_mock
|
||||
|
||||
suite "JsonRpcSubscriptions":
|
||||
|
||||
test "can be instantiated with an http client":
|
||||
|
@ -89,3 +95,62 @@ suite "HTTP polling subscriptions":
|
|||
await client.close()
|
||||
|
||||
subscriptionTests(subscriptions, client)
|
||||
|
||||
suite "HTTP polling subscriptions - filter not found":
|
||||
|
||||
var subscriptions: JsonRpcSubscriptions
|
||||
var client: RpcHttpClient
|
||||
var mockServer: MockRpcHttpServer
|
||||
|
||||
setup:
|
||||
mockServer = MockRpcHttpServer.new()
|
||||
mockServer.start()
|
||||
|
||||
client = newRpcHttpClient()
|
||||
await client.connect("http://" & $mockServer.localAddress()[0])
|
||||
|
||||
subscriptions = JsonRpcSubscriptions.new(client,
|
||||
pollingInterval = 15.millis)
|
||||
subscriptions.start()
|
||||
|
||||
teardown:
|
||||
await subscriptions.close()
|
||||
await client.close()
|
||||
await mockServer.stop()
|
||||
|
||||
test "filter not found error recreates filter":
|
||||
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
|
||||
let emptyHandler = proc(log: Log) = discard
|
||||
|
||||
check mockServer.newFilterCounter == 0
|
||||
let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler)
|
||||
let id = string.fromJson(jsonId).tryGet
|
||||
check mockServer.newFilterCounter == 1
|
||||
|
||||
await sleepAsync(50.millis)
|
||||
mockServer.invalidateFilter(id)
|
||||
await sleepAsync(50.millis)
|
||||
check mockServer.newFilterCounter == 2
|
||||
|
||||
test "recreated filter can be still unsubscribed using the original id":
|
||||
let filter = EventFilter(address: Address.example, topics: @[array[32, byte].example])
|
||||
let emptyHandler = proc(log: Log) = discard
|
||||
|
||||
check mockServer.newFilterCounter == 0
|
||||
let jsonId = await subscriptions.subscribeLogs(filter, emptyHandler)
|
||||
let id = string.fromJson(jsonId).tryGet
|
||||
check mockServer.newFilterCounter == 1
|
||||
|
||||
await sleepAsync(50.millis)
|
||||
mockServer.invalidateFilter(id)
|
||||
check eventually mockServer.newFilterCounter == 2
|
||||
check mockServer.filters[id] == false
|
||||
check mockServer.filters.len() == 2
|
||||
await subscriptions.unsubscribe(jsonId)
|
||||
check mockServer.filters.len() == 1
|
||||
|
||||
# invalidateFilter sets the filter's value to false which will return the "filter not found"
|
||||
# unsubscribing will actually delete the key from filters table
|
||||
# hence after unsubscribing the only key left in the table should be the original id
|
||||
for key in mockServer.filters.keys():
|
||||
check key == id
|
||||
|
|
Loading…
Reference in New Issue