Polling block subscriptions for non-websocket connections

This commit is contained in:
Mark Spanbroek 2023-06-27 14:10:12 +02:00 committed by markspanbroek
parent 127c9c9b0d
commit 6a034870f8
3 changed files with 70 additions and 2 deletions

View File

@ -4,6 +4,7 @@ proc eth_blockNumber: UInt256
proc eth_call(transaction: Transaction, blockTag: BlockTag): seq[byte]
proc eth_gasPrice(): UInt256
proc eth_getBlockByNumber(blockTag: BlockTag, includeTransactions: bool): ?Block
proc eth_getBlockByHash(hash: BlockHash, includeTransactions: bool): ?Block
proc eth_getTransactionCount(address: Address, blockTag: BlockTag): UInt256
proc eth_estimateGas(transaction: Transaction): UInt256
proc eth_chainId(): UInt256
@ -14,3 +15,6 @@ proc eth_sign(account: Address, message: seq[byte]): seq[byte]
proc eth_subscribe(name: string, filter: Filter): JsonNode
proc eth_subscribe(name: string): JsonNode
proc eth_unsubscribe(id: JsonNode): bool
proc eth_newBlockFilter(): JsonNode
proc eth_getFilterChanges(id: JsonNode): JsonNode
proc eth_uninstallFilter(id: JsonNode): bool

View File

@ -1,4 +1,5 @@
import std/tables
import std/sequtils
import pkg/chronos
import pkg/json_rpc/rpcclient
import ../../basics
@ -97,6 +98,48 @@ method unsubscribe(subscriptions: WebSocketSubscriptions,
type
PollingSubscriptions = ref object of JsonRpcSubscriptions
func new*(_: type JsonRpcSubscriptions,
proc new*(_: type JsonRpcSubscriptions,
client: RpcHttpClient): JsonRpcSubscriptions =
PollingSubscriptions(client: client)
let subscriptions = PollingSubscriptions(client: client)
proc poll(id: JsonNode) {.async.} =
for change in await subscriptions.client.eth_getFilterChanges(id):
if callback =? subscriptions.getCallback(id):
callback(id, change)
proc poll {.async.} =
try:
while true:
for id in toSeq subscriptions.callbacks.keys:
await poll(id)
await sleepAsync(1.seconds)
except CancelledError:
raise
asyncSpawn poll()
subscriptions
method subscribeBlocks(subscriptions: PollingSubscriptions,
onBlock: BlockHandler):
Future[JsonRpcSubscription]
{.async.} =
proc getBlock(hash: BlockHash) {.async.} =
if blck =? (await subscriptions.client.eth_getBlockByHash(hash, false)):
await onBlock(blck)
proc callback(id, change: JsonNode) =
if hash =? BlockHash.fromJson(change).catch:
asyncSpawn getBlock(hash)
let id = await subscriptions.client.eth_newBlockFilter()
subscriptions.callbacks[id] = callback
return JsonRpcSubscription(subscriptions: subscriptions, id: id)
method unsubscribe(subscriptions: PollingSubscriptions,
id: JsonNode)
{.async.} =
subscriptions.callbacks.del(id)
discard await subscriptions.client.eth_uninstallFilter(id)

View File

@ -36,3 +36,24 @@ suite "Web socket subscriptions":
check latestBlock.hash.isSome
check latestBlock.timestamp > 0.u256
await subscription.unsubscribe()
suite "HTTP polling subscriptions":
var subscriptions: JsonRpcSubscriptions
var client: RpcHttpClient
setup:
client = newRpcHttpClient()
await client.connect("http://localhost:8545")
subscriptions = JsonRpcSubscriptions.new(client)
test "subscribes to new blocks":
var latestBlock: Block
proc callback(blck: Block) {.async.} =
latestBlock = blck
let subscription = await subscriptions.subscribeBlocks(callback)
discard await client.call("evm_mine", newJArray())
check eventually(latestBlock.number.isSome)
check latestBlock.hash.isSome
check latestBlock.timestamp > 0.u256
await subscription.unsubscribe()