Generalize JSON-RPC subscription table

Allows for other types of subscriptions, not just logs.
This commit is contained in:
Mark Spanbroek 2022-05-16 14:40:30 +02:00 committed by markspanbroek
parent 8c45babcdf
commit 82043c2fcc

View File

@ -17,7 +17,7 @@ push: {.upraises: [].}
type
JsonRpcProvider* = ref object of Provider
client: Future[RpcClient]
subscriptions: Table[JsonNode, LogHandler]
subscriptions: Table[JsonNode, SubscriptionHandler]
JsonRpcSubscription = ref object of Subscription
provider: JsonRpcProvider
id: JsonNode
@ -25,6 +25,7 @@ type
provider: JsonRpcProvider
address: ?Address
JsonRpcProviderError* = object of EthersError
SubscriptionHandler = proc(id, arguments: JsonNode) {.gcsafe, upraises:[].}
template raiseProviderError(message: string) =
raise newException(JsonRpcProviderError, message)
@ -46,20 +47,19 @@ proc connect(_: type RpcClient, url: string): Future[RpcClient] {.async.} =
proc connect(provider: JsonRpcProvider, url: string) =
proc getLogHandler(subscriptionId: JsonNode): ?LogHandler =
proc getSubscriptionHandler(id: JsonNode): ?SubscriptionHandler =
try:
if provider.subscriptions.hasKey(subscriptionId):
provider.subscriptions[subscriptionId].some
if provider.subscriptions.hasKey(id):
provider.subscriptions[id].some
else:
LogHandler.none
SubscriptionHandler.none
except Exception:
LogHandler.none
SubscriptionHandler.none
proc handleSubscription(arguments: JsonNode) {.upraises: [].} =
if id =? arguments["subscription"].catch and
handler =? getLogHandler(id) and
log =? Log.fromJson(arguments["result"]).catch:
handler(log)
handler =? getSubscriptionHandler(id):
handler(id, arguments)
proc subscribe: Future[RpcClient] {.async.} =
let client = await RpcClient.connect(url)
@ -133,8 +133,14 @@ method subscribe*(provider: JsonRpcProvider,
Future[Subscription] {.async.} =
let client = await provider.client
doAssert client of RpcWebSocketClient, "subscriptions require websockets"
proc handler(id, arguments: JsonNode) =
if log =? Log.fromJson(arguments["result"]).catch:
callback(log)
let id = await client.eth_subscribe("logs", some filter)
provider.subscriptions[id] = callback
provider.subscriptions[id] = handler
return JsonRpcSubscription(id: id, provider: provider)
method unsubscribe*(subscription: JsonRpcSubscription) {.async.} =