Move subscription handling to its own module
This commit is contained in:
parent
67c2d631d7
commit
a7dc0ac9eb
|
@ -0,0 +1,100 @@
|
|||
import std/tables
|
||||
import pkg/chronos
|
||||
import pkg/json_rpc/rpcclient
|
||||
import ../../basics
|
||||
import ../../provider
|
||||
import ./rpccalls
|
||||
import ./conversions
|
||||
|
||||
type
|
||||
JsonRpcSubscriptions* = ref object of RootObj
|
||||
client: RpcClient
|
||||
callbacks: Table[JsonNode, SubscriptionCallback]
|
||||
JsonRpcSubscription = ref object of Subscription
|
||||
subscriptions: JsonRpcSubscriptions
|
||||
id: JsonNode
|
||||
SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, upraises:[].}
|
||||
|
||||
method subscribeBlocks*(subscriptions: JsonRpcSubscriptions,
|
||||
onBlock: BlockHandler):
|
||||
Future[JsonRpcSubscription]
|
||||
{.async, base.} =
|
||||
raiseAssert "not implemented"
|
||||
|
||||
method subscribeLogs*(subscriptions: JsonRpcSubscriptions,
|
||||
filter: Filter,
|
||||
onLog: LogHandler):
|
||||
Future[JsonRpcSubscription]
|
||||
{.async, base.} =
|
||||
raiseAssert "not implemented"
|
||||
|
||||
method unsubscribe(subscriptions: JsonRpcSubscriptions,
|
||||
id: JsonNode)
|
||||
{.async, base.} =
|
||||
raiseAssert "not implemented"
|
||||
|
||||
method unsubscribe(subscription: JsonRpcSubscription) {.async.} =
|
||||
await subscription.subscriptions.unsubscribe(subscription.id)
|
||||
|
||||
proc getCallback(subscriptions: JsonRpcSubscriptions,
|
||||
id: JsonNode): ?SubscriptionCallback =
|
||||
try:
|
||||
if subscriptions.callbacks.hasKey(id):
|
||||
subscriptions.callbacks[id].some
|
||||
else:
|
||||
SubscriptionCallback.none
|
||||
except Exception:
|
||||
SubscriptionCallback.none
|
||||
|
||||
# Web sockets
|
||||
|
||||
type
|
||||
WebSocketSubscriptions = ref object of JsonRpcSubscriptions
|
||||
|
||||
method subscribeBlocks(subscriptions: WebSocketSubscriptions,
|
||||
onBlock: BlockHandler):
|
||||
Future[JsonRpcSubscription]
|
||||
{.async.} =
|
||||
proc callback(id, arguments: JsonNode) =
|
||||
if blck =? Block.fromJson(arguments["result"]).catch:
|
||||
asyncSpawn onBlock(blck)
|
||||
let id = await subscriptions.client.eth_subscribe("newHeads")
|
||||
subscriptions.callbacks[id] = callback
|
||||
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
|
||||
|
||||
method subscribeLogs(subscriptions: WebSocketSubscriptions,
|
||||
filter: Filter,
|
||||
onLog: LogHandler):
|
||||
Future[JsonRpcSubscription]
|
||||
{.async.} =
|
||||
proc callback(id, arguments: JsonNode) =
|
||||
if log =? Log.fromJson(arguments["result"]).catch:
|
||||
onLog(log)
|
||||
let id = await subscriptions.client.eth_subscribe("logs", filter)
|
||||
subscriptions.callbacks[id] = callback
|
||||
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
|
||||
|
||||
method unsubscribe(subscriptions: WebSocketSubscriptions,
|
||||
id: JsonNode)
|
||||
{.async.} =
|
||||
subscriptions.callbacks.del(id)
|
||||
discard await subscriptions.client.eth_unsubscribe(id)
|
||||
|
||||
proc new*(_: type JsonRpcSubscriptions,
|
||||
client: RpcWebSocketClient): JsonRpcSubscriptions =
|
||||
let subscriptions = WebSocketSubscriptions(client: client)
|
||||
proc subscriptionHandler(arguments: JsonNode) {.upraises:[].} =
|
||||
if id =? arguments["subscription"].catch and
|
||||
callback =? subscriptions.getCallback(id):
|
||||
callback(id, arguments)
|
||||
client.setMethodHandler("eth_subscription", subscriptionHandler)
|
||||
subscriptions
|
||||
|
||||
# Polling
|
||||
|
||||
type
|
||||
PollingSubscriptions = ref object of JsonRpcSubscriptions
|
||||
|
||||
func new*(_: type JsonRpcSubscriptions,
|
||||
client: RpcHttpClient): JsonRpcSubscriptions =
|
||||
PollingSubscriptions(client: client)
|
|
@ -0,0 +1,38 @@
|
|||
import std/json
|
||||
import pkg/asynctest
|
||||
import pkg/json_rpc/rpcclient
|
||||
import ethers/provider
|
||||
import ethers/providers/jsonrpc/subscriptions
|
||||
|
||||
suite "JsonRpcSubscriptions":
|
||||
|
||||
test "can be instantiated with an http client":
|
||||
let client = newRpcHttpClient()
|
||||
let subscriptions = JsonRpcSubscriptions.new(client)
|
||||
check not isNil subscriptions
|
||||
|
||||
test "can be instantiated with a websocket client":
|
||||
let client = newRpcWebSocketClient()
|
||||
let subscriptions = JsonRpcSubscriptions.new(client)
|
||||
check not isNil subscriptions
|
||||
|
||||
suite "Web socket subscriptions":
|
||||
|
||||
var subscriptions: JsonRpcSubscriptions
|
||||
var client: RpcWebSocketClient
|
||||
|
||||
setup:
|
||||
client = newRpcWebSocketClient()
|
||||
await client.connect("ws://localhost:8545")
|
||||
subscriptions = JsonRpcSubscriptions.new(client)
|
||||
|
||||
test "subscribes to new blocks":
|
||||
var latestBlock: Block
|
||||
proc callback(blck: Block) {.async.} =
|
||||
latestBlock = blck
|
||||
let subscription = await subscriptions.subscribeBlocks(callback)
|
||||
discard await client.call("evm_mine", newJArray())
|
||||
check eventually(latestBlock.number.isSome)
|
||||
check latestBlock.hash.isSome
|
||||
check latestBlock.timestamp > 0.u256
|
||||
await subscription.unsubscribe()
|
|
@ -1,4 +1,5 @@
|
|||
import ./jsonrpc/testJsonRpcProvider
|
||||
import ./jsonrpc/testJsonRpcSigner
|
||||
import ./jsonrpc/testJsonRpcSubscriptions
|
||||
|
||||
{.warning[UnusedImport]:off.}
|
||||
|
|
Loading…
Reference in New Issue