nim-ethers/ethers/providers/jsonrpc/subscriptions.nim
Eric b68bea9909
fix: modify unsubscribe cleanup routine and tests (#84)
* fix: modify unsubscribe cleanup routine

Ignore exceptions (other than CancelledError) if uninstallation of the filter fails. If it's the last step in the subscription cleanup, then filter changes for this filter will no longer be polled so if the filter continues to live on in geth for whatever reason, then it doesn't matter.

This includes a number of fixes:
- `CancelledError` is now caught inside of `getChanges`. This was causing conditions during `subscriptions.close`, where the `CancelledError` would get consumed by the `except CatchableError`, if there was an ongoing `poll` happening at the time of close.
- After creating a new filter inside of `getChanges`, the new filter is polled for changes before returning.
- `getChanges` also does not swallow `CatchableError` by returning an empty array, and instead re-raises the error if it is not `filter not found`.
- The tests were simplified by accessing the private fields of `PollingSubscriptions`. That way, there wasn't a race condition for the `newFilterId` counter inside of the mock.
- The `MockRpcHttpServer` was simplified by keeping track of the active filters only, and invalidation simply removes the filter. The tests then only needed to rely on the fact that the filter id changed in the mapping.
- Because of the above changes, we no longer needed to sleep inside of the tests, so the sleeps were removed, and the polling interval could be changed to 1ms, which not only makes the tests faster, but would further highlight any race conditions if present.

* docs: rpc custom port documentation

---------

Co-authored-by: Adam Uhlíř <adam@uhlir.dev>
2024-10-25 14:58:45 +11:00

246 lines
8.2 KiB
Nim

import std/tables
import std/sequtils
import std/strutils
import pkg/chronos
import pkg/json_rpc/rpcclient
import ../../basics
import ../../provider
include ../../nimshims/hashes
import ./rpccalls
import ./conversions
import ./looping
type
JsonRpcSubscriptions* = ref object of RootObj
client: RpcClient
callbacks: Table[JsonNode, SubscriptionCallback]
methodHandlers: Table[string, MethodHandler]
MethodHandler* = proc (j: JsonNode) {.gcsafe, raises: [].}
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, raises:[].}
SubscriptionError* = object of EthersError
{.push raises:[].}
template `or`(a: JsonNode, b: typed): JsonNode =
if a.isNil: b else: a
func start*(subscriptions: JsonRpcSubscriptions) =
subscriptions.client.onProcessMessage =
proc(client: RpcClient,
line: string): Result[bool, string] {.gcsafe, raises: [].} =
if json =? JsonNode.fromJson(line):
if "method" in json:
let methodName = json{"method"}.getStr()
if methodName in subscriptions.methodHandlers:
let handler = subscriptions.methodHandlers.getOrDefault(methodName)
if not handler.isNil:
handler(json{"params"} or newJArray())
# false = do not continue processing message using json_rpc's
# default processing handler
return ok false
# true = continue processing message using json_rpc's default message handler
return ok true
proc setMethodHandler(
subscriptions: JsonRpcSubscriptions,
`method`: string,
handler: MethodHandler
) =
subscriptions.methodHandlers[`method`] = handler
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async, base.} =
raiseAssert "not implemented"
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
filter: EventFilter,
onLog: LogHandler):
Future[JsonNode]
{.async, base.} =
raiseAssert "not implemented"
method unsubscribe*(subscriptions: JsonRpcSubscriptions,
id: JsonNode)
{.async, base.} =
raiseAssert "not implemented"
method close*(subscriptions: JsonRpcSubscriptions) {.async, base.} =
let ids = toSeq subscriptions.callbacks.keys
for id in ids:
await subscriptions.unsubscribe(id)
proc getCallback(subscriptions: JsonRpcSubscriptions,
id: JsonNode): ?SubscriptionCallback =
try:
if not id.isNil and id in subscriptions.callbacks:
subscriptions.callbacks[id].some
else:
SubscriptionCallback.none
except KeyError:
SubscriptionCallback.none
# Web sockets
type
WebSocketSubscriptions = ref object of JsonRpcSubscriptions
proc new*(_: type JsonRpcSubscriptions,
client: RpcWebSocketClient): JsonRpcSubscriptions =
let subscriptions = WebSocketSubscriptions(client: client)
proc subscriptionHandler(arguments: JsonNode) {.raises:[].} =
let id = arguments{"subscription"} or newJString("")
if callback =? subscriptions.getCallback(id):
callback(id, arguments)
subscriptions.setMethodHandler("eth_subscription", subscriptionHandler)
subscriptions
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) {.raises: [].} =
if blck =? Block.fromJson(arguments{"result"}):
onBlock(blck)
let id = await subscriptions.client.eth_subscribe("newHeads")
subscriptions.callbacks[id] = callback
return id
method subscribeLogs(subscriptions: WebSocketSubscriptions,
filter: EventFilter,
onLog: LogHandler):
Future[JsonNode]
{.async.} =
proc callback(id, arguments: JsonNode) =
if log =? Log.fromJson(arguments{"result"}):
onLog(log)
let id = await subscriptions.client.eth_subscribe("logs", filter)
subscriptions.callbacks[id] = callback
return id
method unsubscribe*(subscriptions: WebSocketSubscriptions,
id: JsonNode)
{.async.} =
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_unsubscribe(id)
# Polling
type
PollingSubscriptions* = ref object of JsonRpcSubscriptions
polling: Future[void]
# We need to keep around the filters that are used to create log filters on the RPC node
# as there might be a time when they need to be recreated as RPC node might prune/forget
# about them
filters: Table[JsonNode, EventFilter]
# Used when filters are recreated to translate from the id that user
# originally got returned to new filter id
subscriptionMapping: Table[JsonNode, JsonNode]
proc new*(_: type JsonRpcSubscriptions,
client: RpcHttpClient,
pollingInterval = 4.seconds): JsonRpcSubscriptions =
let subscriptions = PollingSubscriptions(client: client)
proc getChanges(originalId: JsonNode): Future[JsonNode] {.async.} =
try:
let mappedId = subscriptions.subscriptionMapping[originalId]
let changes = await subscriptions.client.eth_getFilterChanges(mappedId)
if changes.kind == JNull:
return newJArray()
elif changes.kind != JArray:
raise newException(SubscriptionError,
"HTTP polling: unexpected value returned from eth_getFilterChanges." &
" Expected: JArray, got: " & $changes.kind)
return changes
except CancelledError as e:
raise e
except CatchableError as e:
if "filter not found" in e.msg:
let filter = subscriptions.filters[originalId]
let newId = await subscriptions.client.eth_newFilter(filter)
subscriptions.subscriptionMapping[originalId] = newId
return await getChanges(originalId)
else:
raise e
proc poll(id: JsonNode) {.async.} =
for change in await getChanges(id):
if callback =? subscriptions.getCallback(id):
callback(id, change)
proc poll {.async.} =
untilCancelled:
for id in toSeq subscriptions.callbacks.keys:
await poll(id)
await sleepAsync(pollingInterval)
subscriptions.polling = poll()
subscriptions
method close*(subscriptions: PollingSubscriptions) {.async.} =
await subscriptions.polling.cancelAndWait()
await procCall JsonRpcSubscriptions(subscriptions).close()
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonNode]
{.async.} =
proc getBlock(hash: BlockHash) {.async.} =
try:
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
onBlock(blck)
except CatchableError:
discard
proc callback(id, change: JsonNode) =
if hash =? BlockHash.fromJson(change):
asyncSpawn getBlock(hash)
let id = await subscriptions.client.eth_newBlockFilter()
subscriptions.callbacks[id] = callback
subscriptions.subscriptionMapping[id] = id
return id
method subscribeLogs(subscriptions: PollingSubscriptions,
filter: EventFilter,
onLog: LogHandler):
Future[JsonNode]
{.async.} =
proc callback(id, change: JsonNode) =
if log =? Log.fromJson(change):
onLog(log)
let id = await subscriptions.client.eth_newFilter(filter)
subscriptions.callbacks[id] = callback
subscriptions.filters[id] = filter
subscriptions.subscriptionMapping[id] = id
return id
method unsubscribe*(subscriptions: PollingSubscriptions,
id: JsonNode)
{.async.} =
subscriptions.filters.del(id)
subscriptions.callbacks.del(id)
let sub = subscriptions.subscriptionMapping[id]
subscriptions.subscriptionMapping.del(id)
try:
discard await subscriptions.client.eth_uninstallFilter(sub)
except CancelledError as e:
raise e
except CatchableError:
# Ignore if uninstallation of the filter fails. If it's the last step in our
# cleanup, then filter changes for this filter will no longer be polled so
# if the filter continues to live on in geth for whatever reason then it
# doesn't matter.
discard