diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim new file mode 100644 index 0000000..f7a2d8c --- /dev/null +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -0,0 +1,100 @@ +import std/tables +import pkg/chronos +import pkg/json_rpc/rpcclient +import ../../basics +import ../../provider +import ./rpccalls +import ./conversions + +type + JsonRpcSubscriptions* = ref object of RootObj + client: RpcClient + callbacks: Table[JsonNode, SubscriptionCallback] + JsonRpcSubscription = ref object of Subscription + subscriptions: JsonRpcSubscriptions + id: JsonNode + SubscriptionCallback = proc(id, arguments: JsonNode) {.gcsafe, upraises:[].} + +method subscribeBlocks*(subscriptions: JsonRpcSubscriptions, + onBlock: BlockHandler): + Future[JsonRpcSubscription] + {.async, base.} = + raiseAssert "not implemented" + +method subscribeLogs*(subscriptions: JsonRpcSubscriptions, + filter: Filter, + onLog: LogHandler): + Future[JsonRpcSubscription] + {.async, base.} = + raiseAssert "not implemented" + +method unsubscribe(subscriptions: JsonRpcSubscriptions, + id: JsonNode) + {.async, base.} = + raiseAssert "not implemented" + +method unsubscribe(subscription: JsonRpcSubscription) {.async.} = + await subscription.subscriptions.unsubscribe(subscription.id) + +proc getCallback(subscriptions: JsonRpcSubscriptions, + id: JsonNode): ?SubscriptionCallback = + try: + if subscriptions.callbacks.hasKey(id): + subscriptions.callbacks[id].some + else: + SubscriptionCallback.none + except Exception: + SubscriptionCallback.none + +# Web sockets + +type + WebSocketSubscriptions = ref object of JsonRpcSubscriptions + +method subscribeBlocks(subscriptions: WebSocketSubscriptions, + onBlock: BlockHandler): + Future[JsonRpcSubscription] + {.async.} = + proc callback(id, arguments: JsonNode) = + if blck =? Block.fromJson(arguments["result"]).catch: + asyncSpawn onBlock(blck) + let id = await subscriptions.client.eth_subscribe("newHeads") + subscriptions.callbacks[id] = callback + return JsonRpcSubscription(subscriptions: subscriptions, id: id) + +method subscribeLogs(subscriptions: WebSocketSubscriptions, + filter: Filter, + onLog: LogHandler): + Future[JsonRpcSubscription] + {.async.} = + proc callback(id, arguments: JsonNode) = + if log =? Log.fromJson(arguments["result"]).catch: + onLog(log) + let id = await subscriptions.client.eth_subscribe("logs", filter) + subscriptions.callbacks[id] = callback + return JsonRpcSubscription(subscriptions: subscriptions, id: id) + +method unsubscribe(subscriptions: WebSocketSubscriptions, + id: JsonNode) + {.async.} = + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_unsubscribe(id) + +proc new*(_: type JsonRpcSubscriptions, + client: RpcWebSocketClient): JsonRpcSubscriptions = + let subscriptions = WebSocketSubscriptions(client: client) + proc subscriptionHandler(arguments: JsonNode) {.upraises:[].} = + if id =? arguments["subscription"].catch and + callback =? subscriptions.getCallback(id): + callback(id, arguments) + client.setMethodHandler("eth_subscription", subscriptionHandler) + subscriptions + +# Polling + +type + PollingSubscriptions = ref object of JsonRpcSubscriptions + +func new*(_: type JsonRpcSubscriptions, + client: RpcHttpClient): JsonRpcSubscriptions = + PollingSubscriptions(client: client) diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim new file mode 100644 index 0000000..bff0d2a --- /dev/null +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -0,0 +1,38 @@ +import std/json +import pkg/asynctest +import pkg/json_rpc/rpcclient +import ethers/provider +import ethers/providers/jsonrpc/subscriptions + +suite "JsonRpcSubscriptions": + + test "can be instantiated with an http client": + let client = newRpcHttpClient() + let subscriptions = JsonRpcSubscriptions.new(client) + check not isNil subscriptions + + test "can be instantiated with a websocket client": + let client = newRpcWebSocketClient() + let subscriptions = JsonRpcSubscriptions.new(client) + check not isNil subscriptions + +suite "Web socket subscriptions": + + var subscriptions: JsonRpcSubscriptions + var client: RpcWebSocketClient + + setup: + client = newRpcWebSocketClient() + await client.connect("ws://localhost:8545") + subscriptions = JsonRpcSubscriptions.new(client) + + test "subscribes to new blocks": + var latestBlock: Block + proc callback(blck: Block) {.async.} = + latestBlock = blck + let subscription = await subscriptions.subscribeBlocks(callback) + discard await client.call("evm_mine", newJArray()) + check eventually(latestBlock.number.isSome) + check latestBlock.hash.isSome + check latestBlock.timestamp > 0.u256 + await subscription.unsubscribe() diff --git a/testmodule/providers/testJsonRpc.nim b/testmodule/providers/testJsonRpc.nim index 4c7326e..f302669 100644 --- a/testmodule/providers/testJsonRpc.nim +++ b/testmodule/providers/testJsonRpc.nim @@ -1,4 +1,5 @@ import ./jsonrpc/testJsonRpcProvider import ./jsonrpc/testJsonRpcSigner +import ./jsonrpc/testJsonRpcSubscriptions {.warning[UnusedImport]:off.}