Subscribe to new blocks

This commit is contained in:
Mark Spanbroek 2022-05-16 14:51:39 +02:00 committed by markspanbroek
parent 82043c2fcc
commit bbf133725f
4 changed files with 38 additions and 8 deletions

View File

@ -18,6 +18,7 @@ type
data*: seq[byte]
topics*: seq[Topic]
LogHandler* = proc(log: Log) {.gcsafe, upraises:[].}
BlockHandler* = proc(blck: Block) {.gcsafe, upraises:[].}
Topic* = array[32, byte]
Block* = object
number*: UInt256
@ -57,5 +58,10 @@ method subscribe*(provider: Provider,
Future[Subscription] {.base.} =
doAssert false, "not implemented"
method subscribe*(provider: Provider,
callback: BlockHandler):
Future[Subscription] {.base.} =
doAssert false, "not implemented"
method unsubscribe*(subscription: Subscription) {.base, async.} =
doAssert false, "not implemented"

View File

@ -127,21 +127,34 @@ method getChainId*(provider: JsonRpcProvider): Future[UInt256] {.async.} =
except CatchableError:
return parse(await client.net_version(), UInt256)
proc subscribe(provider: JsonRpcProvider,
name: string,
filter: ?Filter,
handler: SubscriptionHandler): Future[Subscription] {.async.} =
let client = await provider.client
doAssert client of RpcWebSocketClient, "subscriptions require websockets"
let id = await client.eth_subscribe(name, filter)
provider.subscriptions[id] = handler
return JsonRpcSubscription(id: id, provider: provider)
method subscribe*(provider: JsonRpcProvider,
filter: Filter,
callback: LogHandler):
Future[Subscription] {.async.} =
let client = await provider.client
doAssert client of RpcWebSocketClient, "subscriptions require websockets"
proc handler(id, arguments: JsonNode) =
if log =? Log.fromJson(arguments["result"]).catch:
callback(log)
return await provider.subscribe("logs", filter.some, handler)
let id = await client.eth_subscribe("logs", some filter)
provider.subscriptions[id] = handler
return JsonRpcSubscription(id: id, provider: provider)
method subscribe*(provider: JsonRpcProvider,
callback: BlockHandler):
Future[Subscription] {.async.} =
proc handler(id, arguments: JsonNode) =
if blck =? Block.fromJson(arguments["result"]).catch:
callback(blck)
return await provider.subscribe("newHeads", Filter.none, handler)
method unsubscribe*(subscription: JsonRpcSubscription) {.async.} =
let provider = subscription.provider

View File

@ -9,5 +9,5 @@ proc eth_estimateGas(transaction: Transaction): UInt256
proc eth_chainId(): UInt256
proc eth_sendTransaction(transaction: Transaction): array[32, byte]
proc eth_sign(account: Address, message: seq[byte]): seq[byte]
proc eth_subscribe(name: string, filter: ?Filter): JsonNode
proc eth_subscribe(name: string, filter = Filter.none): JsonNode
proc eth_unsubscribe(id: JsonNode): bool

View File

@ -39,3 +39,14 @@ suite "JsonRpcProvider":
check block1.hash != block2.hash
check block1.number < block2.number
check block1.timestamp < block2.timestamp
test "subscribes to new blocks":
let oldBlock = !await provider.getBlock(BlockTag.latest)
var newBlock: Block
let blockHandler = proc(blck: Block) = newBlock = blck
let subscription = await provider.subscribe(blockHandler)
discard await provider.send("evm_mine")
check newBlock.number > oldBlock.number
check newBlock.timestamp > oldBlock.timestamp
check newBlock.hash != oldBlock.hash
await subscription.unsubscribe()