diff --git a/ethers/providers/jsonrpc/signatures.nim b/ethers/providers/jsonrpc/signatures.nim index 8738615..99cfc98 100644 --- a/ethers/providers/jsonrpc/signatures.nim +++ b/ethers/providers/jsonrpc/signatures.nim @@ -4,6 +4,7 @@ proc eth_blockNumber: UInt256 proc eth_call(transaction: Transaction, blockTag: BlockTag): seq[byte] proc eth_gasPrice(): UInt256 proc eth_getBlockByNumber(blockTag: BlockTag, includeTransactions: bool): ?Block +proc eth_getBlockByHash(hash: BlockHash, includeTransactions: bool): ?Block proc eth_getTransactionCount(address: Address, blockTag: BlockTag): UInt256 proc eth_estimateGas(transaction: Transaction): UInt256 proc eth_chainId(): UInt256 @@ -14,3 +15,6 @@ proc eth_sign(account: Address, message: seq[byte]): seq[byte] proc eth_subscribe(name: string, filter: Filter): JsonNode proc eth_subscribe(name: string): JsonNode proc eth_unsubscribe(id: JsonNode): bool +proc eth_newBlockFilter(): JsonNode +proc eth_getFilterChanges(id: JsonNode): JsonNode +proc eth_uninstallFilter(id: JsonNode): bool diff --git a/ethers/providers/jsonrpc/subscriptions.nim b/ethers/providers/jsonrpc/subscriptions.nim index a6034e7..6855a7e 100644 --- a/ethers/providers/jsonrpc/subscriptions.nim +++ b/ethers/providers/jsonrpc/subscriptions.nim @@ -1,4 +1,5 @@ import std/tables +import std/sequtils import pkg/chronos import pkg/json_rpc/rpcclient import ../../basics @@ -97,6 +98,48 @@ method unsubscribe(subscriptions: WebSocketSubscriptions, type PollingSubscriptions = ref object of JsonRpcSubscriptions -func new*(_: type JsonRpcSubscriptions, +proc new*(_: type JsonRpcSubscriptions, client: RpcHttpClient): JsonRpcSubscriptions = - PollingSubscriptions(client: client) + + let subscriptions = PollingSubscriptions(client: client) + + proc poll(id: JsonNode) {.async.} = + for change in await subscriptions.client.eth_getFilterChanges(id): + if callback =? subscriptions.getCallback(id): + callback(id, change) + + proc poll {.async.} = + try: + while true: + for id in toSeq subscriptions.callbacks.keys: + await poll(id) + await sleepAsync(1.seconds) + except CancelledError: + raise + + asyncSpawn poll() + + subscriptions + +method subscribeBlocks(subscriptions: PollingSubscriptions, + onBlock: BlockHandler): + Future[JsonRpcSubscription] + {.async.} = + + proc getBlock(hash: BlockHash) {.async.} = + if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)): + await onBlock(blck) + + proc callback(id, change: JsonNode) = + if hash =? BlockHash.fromJson(change).catch: + asyncSpawn getBlock(hash) + + let id = await subscriptions.client.eth_newBlockFilter() + subscriptions.callbacks[id] = callback + return JsonRpcSubscription(subscriptions: subscriptions, id: id) + +method unsubscribe(subscriptions: PollingSubscriptions, + id: JsonNode) + {.async.} = + subscriptions.callbacks.del(id) + discard await subscriptions.client.eth_uninstallFilter(id) diff --git a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim index bff0d2a..83081ba 100644 --- a/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim +++ b/testmodule/providers/jsonrpc/testJsonRpcSubscriptions.nim @@ -36,3 +36,24 @@ suite "Web socket subscriptions": check latestBlock.hash.isSome check latestBlock.timestamp > 0.u256 await subscription.unsubscribe() + +suite "HTTP polling subscriptions": + + var subscriptions: JsonRpcSubscriptions + var client: RpcHttpClient + + setup: + client = newRpcHttpClient() + await client.connect("http://localhost:8545") + subscriptions = JsonRpcSubscriptions.new(client) + + test "subscribes to new blocks": + var latestBlock: Block + proc callback(blck: Block) {.async.} = + latestBlock = blck + let subscription = await subscriptions.subscribeBlocks(callback) + discard await client.call("evm_mine", newJArray()) + check eventually(latestBlock.number.isSome) + check latestBlock.hash.isSome + check latestBlock.timestamp > 0.u256 + await subscription.unsubscribe()