support async subscription event handlers

This commit is contained in:
Eric Mastro 2023-02-07 16:09:51 +11:00
parent e462649aec
commit a87b1b23a8
No known key found for this signature in database
GPG Key ID: 141E3048D95A4E63
4 changed files with 49 additions and 0 deletions

View File

@ -27,6 +27,7 @@ type
ContractError* = object of EthersError
Confirmable* = ?TransactionResponse
EventHandler*[E: Event] = proc(event: E) {.gcsafe, upraises:[].}
AsyncEventHandler*[E: Event] = proc(event: E): Future[void] {.gcsafe, upraises:[].}
func new*(ContractType: type Contract,
address: Address,
@ -215,3 +216,17 @@ proc subscribe*[E: Event](contract: Contract,
handler(event)
contract.provider.subscribe(filter, logHandler)
proc subscribe*[E: Event](contract: Contract,
_: type E,
handler: AsyncEventHandler[E]):
Future[Subscription] =
let topic = topic($E, E.fieldTypes).toArray
let filter = Filter(address: contract.address, topics: @[topic])
proc logHandler(log: Log) {.async, upraises: [].} =
if event =? E.decode(log.data, log.topics):
await handler(event)
contract.provider.subscribe(filter, logHandler)

View File

@ -41,6 +41,7 @@ type
cumulativeGasUsed*: UInt256
status*: TransactionStatus
LogHandler* = proc(log: Log) {.gcsafe, upraises:[].}
AsyncLogHandler* = proc(log: Log): Future[void] {.gcsafe, upraises:[].}
BlockHandler* = proc(blck: Block): Future[void] {.gcsafe, upraises:[].}
Topic* = array[32, byte]
Block* = object
@ -94,6 +95,12 @@ method subscribe*(provider: Provider,
Future[Subscription] {.base.} =
doAssert false, "not implemented"
method subscribe*(provider: Provider,
filter: Filter,
callback: AsyncLogHandler):
Future[Subscription] {.base.} =
doAssert false, "not implemented"
method subscribe*(provider: Provider,
callback: BlockHandler):
Future[Subscription] {.base.} =

View File

@ -195,6 +195,15 @@ method subscribe*(provider: JsonRpcProvider,
callback(log)
return await provider.subscribe("logs", filter.some, handler)
method subscribe*(provider: JsonRpcProvider,
filter: Filter,
callback: AsyncLogHandler):
Future[Subscription] {.async.} =
proc handler(id, arguments: JsonNode) {.async.} =
if log =? Log.fromJson(arguments["result"]).catch:
await callback(log)
return await provider.subscribe("logs", filter.some, handler)
method subscribe*(provider: JsonRpcProvider,
callback: BlockHandler):
Future[Subscription] {.async.} =

View File

@ -163,6 +163,24 @@ suite "Contracts":
check transfers == @[Transfer(receiver: accounts[0], value: 100.u256)]
test "supports async event handlers in subscriptions":
var transfers: seq[Transfer]
proc handleTransfer(transfer: Transfer) {.async.} =
await sleepAsync(1.milliseconds)
transfers.add(transfer)
let signer0 = provider.getSigner(accounts[0])
let subscription = await token.subscribe(Transfer, handleTransfer)
discard await token.connect(signer0).mint(accounts[0], 100.u256)
await subscription.unsubscribe()
check transfers == @[
Transfer(receiver: accounts[0], value: 100.u256)
]
test "can wait for contract interaction tx to be mined":
# must not be awaited so we can get newHeads inside of .wait
let futMined = provider.mineBlocks(10)